From b50d9b47cc7aaa229ea56b6be02be060836787af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Fri, 7 May 2021 18:08:39 +0200 Subject: [PATCH] Remove rebalance in Flink benchmarks --- .../uc1/application/HistoryServiceFlinkJob.java | 4 ++-- .../uc2/application/HistoryServiceFlinkJob.java | 10 ++++++---- .../uc3/application/HistoryServiceFlinkJob.java | 5 ++--- .../uc4/application/AggregationServiceFlinkJob.java | 6 +++--- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 8d9832e40..0cb132e52 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob { // Parallelism final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); if (parallelism != null) { - LOGGER.error("Set parallelism: {}.", parallelism); + LOGGER.info("Set parallelism: {}.", parallelism); this.env.setParallelism(parallelism); } @@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob { final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer); stream - .rebalance() + // .rebalance() .map(new GsonMapper()) .flatMap((record, c) -> LOGGER.info("Record: {}", record)) .returns(Types.GENERIC(Object.class)); // Will never be used diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index 106826708..d156d895d 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob { // Parallelism final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); if (parallelism != null) { - LOGGER.error("Set parallelism: {}.", parallelism); + LOGGER.info("Set parallelism: {}.", parallelism); this.env.setParallelism(parallelism); } @@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob { final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); + final int windowDurationMinutes = + this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); + final Time windowDuration = Time.minutes(windowDurationMinutes); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( @@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob { this.env .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) - .rebalance() + // .rebalance() .keyBy(ActivePowerRecord::getIdentifier) - .window(TumblingEventTimeWindows.of(Time.minutes(windowDuration))) + .window(TumblingEventTimeWindows.of(windowDuration)) .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) .map(t -> { final String key = t.f0; diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index d69ee47d8..091b25674 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob { // Streaming topology final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); this.env - .addSource(kafkaSource) - .name("[Kafka Consumer] Topic: " + inputTopic) - .rebalance() + .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) + // .rebalance() .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> { final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 45c7ff1ad..3e2878a89 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob { // Parallelism final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); if (parallelism != null) { - LOGGER.error("Set parallelism: {}.", parallelism); + LOGGER.info("Set parallelism: {}.", parallelism); this.env.setParallelism(parallelism); } @@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob { // Build input stream final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS - .rebalance() + // .rebalance() .map(r -> r) .name("[Map] Rebalance Forward"); @@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob { final DataStream<ActivePowerRecord> aggregationsInputStream = this.env.addSource(kafkaOutputSource) .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS - .rebalance() + // .rebalance() .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); -- GitLab