diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
index ed961bab733a409dc07b1be7fa35562103c3e2f4..382525cfe75f82dbbe8fbcc85308b0e7788a43bc 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
@@ -19,6 +19,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
index 6655b52ec3020f46bb8a37c7124ee870fa663573..8d9832e40253fe9e3178bfc25047ed2b376abe76 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -1,6 +1,7 @@
 package theodolite.uc1.application;
 
 import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
@@ -42,6 +43,14 @@ public final class HistoryServiceFlinkJob {
     if (checkpointing) {
       this.env.enableCheckpointing(commitIntervalMs);
     }
+
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
   }
 
   private void buildPipeline() {
@@ -61,7 +70,8 @@ public final class HistoryServiceFlinkJob {
     stream
         .rebalance()
         .map(new GsonMapper())
-        .flatMap((record, c) -> LOGGER.info("Record: {}", record));
+        .flatMap((record, c) -> LOGGER.info("Record: {}", record))
+        .returns(Types.GENERIC(Object.class)); // Will never be used
   }
 
   /**
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
index 9ba56c828a0ae5c6147aadd90d449c7cf2324992..e8261062689ce4c586a4e6fbde02878a28f48e97 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
@@ -30,6 +30,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
index b8452847df800226ad481f9309323a2a9a532939..1068267086892c4538001b6afc670b3b0cd043ef 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
index a895c74d89c5d788c47b3b78dc70500b4b5a6f5b..bc4e0b9d2d230026e9d2b6df0a11e4fb68380aed 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
@@ -34,6 +34,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index 0f26d37652924a16be1840fd759b3cd5b023f338..d69ee47d8c831f2e5e74abdd8c33393c8ee6e07e 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
index 0db5a3d524f74fbf22304e8f9b44fa55eead321a..45c7ff1ad1faeec6357e4ac3871dec7a51306698 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
index 6497f6b055ef115c4a681499c5fa38657bb5d29e..448e8b095ef15c434655ca3c76a9e2de21244054 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
@@ -15,7 +15,7 @@ public final class ConfigurationKeys {
   public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
 
   public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
-  
+
   public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
 
   public static final String WINDOW_SIZE_MS = "window.size.ms";
@@ -28,13 +28,15 @@ public final class ConfigurationKeys {
 
   public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
 
-  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = //NOPMD
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
       "flink.state.backend.memory.size";
 
   public static final String DEBUG = "debug";
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }