Skip to content
Snippets Groups Projects

Remove rebalance in Flink benchmarks

Merged Sören Henning requested to merge flink-rm-rebalance into theodolite-kotlin
4 files
+ 13
12
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
this.env.setParallelism(parallelism);
}
}
@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream
stream
.rebalance()
// .rebalance()
.map(new GsonMapper())
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
.returns(Types.GENERIC(Object.class)); // Will never be used
Loading