diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
index 8d9832e40253fe9e3178bfc25047ed2b376abe76..0cb132e526486e71409736b843dd25bdfa52da4a 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
     // Parallelism
     final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
     if (parallelism != null) {
-      LOGGER.error("Set parallelism: {}.", parallelism);
+      LOGGER.info("Set parallelism: {}.", parallelism);
       this.env.setParallelism(parallelism);
     }
 
@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
     final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
 
     stream
-        .rebalance()
+        // .rebalance()
         .map(new GsonMapper())
         .flatMap((record, c) -> LOGGER.info("Record: {}", record))
         .returns(Types.GENERIC(Object.class)); // Will never be used
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
index 1068267086892c4538001b6afc670b3b0cd043ef..d156d895d86bb01a31f96e08764df8b8df743c4d 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob {
     // Parallelism
     final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
     if (parallelism != null) {
-      LOGGER.error("Set parallelism: {}.", parallelism);
+      LOGGER.info("Set parallelism: {}.", parallelism);
       this.env.setParallelism(parallelism);
     }
 
@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob {
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
+    final int windowDurationMinutes =
+        this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
+    final Time windowDuration = Time.minutes(windowDurationMinutes);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
     final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob {
 
     this.env
         .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
-        .rebalance()
+        // .rebalance()
         .keyBy(ActivePowerRecord::getIdentifier)
-        .window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
+        .window(TumblingEventTimeWindows.of(windowDuration))
         .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
         .map(t -> {
           final String key = t.f0;
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index d69ee47d8c831f2e5e74abdd8c33393c8ee6e07e..091b25674a2a31671ca68bd2076c694da9533d77 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob {
     // Streaming topology
     final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
     this.env
-        .addSource(kafkaSource)
-        .name("[Kafka Consumer] Topic: " + inputTopic)
-        .rebalance()
+        .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
+        // .rebalance()
         .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
           final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
           final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
index 45c7ff1ad1faeec6357e4ac3871dec7a51306698..3e2878a893057024de00333492462f5029eb6d77 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob {
     // Parallelism
     final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
     if (parallelism != null) {
-      LOGGER.error("Set parallelism: {}.", parallelism);
+      LOGGER.info("Set parallelism: {}.", parallelism);
       this.env.setParallelism(parallelism);
     }
 
@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob {
     // Build input stream
     final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
         .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
-        .rebalance()
+        // .rebalance()
         .map(r -> r)
         .name("[Map] Rebalance Forward");
 
@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob {
     final DataStream<ActivePowerRecord> aggregationsInputStream =
         this.env.addSource(kafkaOutputSource)
             .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
-            .rebalance()
+            // .rebalance()
             .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
             .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");