Skip to content
Snippets Groups Projects
Commit 7b06fdf6 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'flink-rm-rebalance' into 'theodolite-kotlin'

Remove rebalance in Flink benchmarks

See merge request !141
parents ad6b50e8 33142b62
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!141Remove rebalance in Flink benchmarks,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3673 passed
......@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
......@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream
.rebalance()
// .rebalance()
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
......
......@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
......@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final int windowDurationMinutes =
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final Time windowDuration = Time.minutes(windowDurationMinutes);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob {
this.env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
// .rebalance()
.keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
.window(TumblingEventTimeWindows.of(windowDuration))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(t -> {
final String key = t.f0;
......
......@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob {
// Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
this.env
.addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
// .rebalance()
.keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
......
......@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob {
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
......@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob {
// Build input stream
final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance()
// .rebalance()
.map(r -> r)
.name("[Map] Rebalance Forward");
......@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob {
final DataStream<ActivePowerRecord> aggregationsInputStream =
this.env.addSource(kafkaOutputSource)
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.rebalance()
// .rebalance()
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment