Skip to content
Snippets Groups Projects
Commit b50d9b47 authored by Sören Henning's avatar Sören Henning
Browse files

Remove rebalance in Flink benchmarks

parent 289dcadd
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!141Remove rebalance in Flink benchmarks,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3180 passed
...@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob { ...@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob { ...@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer); final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream stream
.rebalance() // .rebalance()
.map(new GsonMapper()) .map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record)) .flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used .returns(Types.GENERIC(Object.class)); // Will never be used
......
...@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob { ...@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob { ...@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final int windowDurationMinutes =
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final Time windowDuration = Time.minutes(windowDurationMinutes);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
...@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob { ...@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob {
this.env this.env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance() // .rebalance()
.keyBy(ActivePowerRecord::getIdentifier) .keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration))) .window(TumblingEventTimeWindows.of(windowDuration))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(t -> { .map(t -> {
final String key = t.f0; final String key = t.f0;
......
...@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob { ...@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob {
// Streaming topology // Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
this.env this.env
.addSource(kafkaSource) .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.name("[Kafka Consumer] Topic: " + inputTopic) // .rebalance()
.rebalance()
.keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> { .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
......
...@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob { ...@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob { ...@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob {
// Build input stream // Build input stream
final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance() // .rebalance()
.map(r -> r) .map(r -> r)
.name("[Map] Rebalance Forward"); .name("[Map] Rebalance Forward");
...@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob { ...@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob {
final DataStream<ActivePowerRecord> aggregationsInputStream = final DataStream<ActivePowerRecord> aggregationsInputStream =
this.env.addSource(kafkaOutputSource) this.env.addSource(kafkaOutputSource)
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.rebalance() // .rebalance()
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment