From e09d88e805356ddb481c6196ef30044b4e5a492f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Wed, 28 Apr 2021 17:41:54 +0200
Subject: [PATCH] Make parallelism configurable via env. variables

---
 .../theodolite/uc1/application/ConfigurationKeys.java     | 2 ++
 .../uc1/application/HistoryServiceFlinkJob.java           | 8 ++++++++
 .../theodolite/uc2/application/ConfigurationKeys.java     | 2 ++
 .../uc2/application/HistoryServiceFlinkJob.java           | 7 +++++++
 .../theodolite/uc3/application/ConfigurationKeys.java     | 2 ++
 .../uc3/application/HistoryServiceFlinkJob.java           | 7 +++++++
 .../uc4/application/AggregationServiceFlinkJob.java       | 7 +++++++
 .../theodolite/uc4/application/ConfigurationKeys.java     | 6 ++++--
 8 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
index ed961bab7..382525cfe 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
@@ -19,6 +19,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
index 0b732d5b4..8d9832e40 100644
--- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -43,6 +43,14 @@ public final class HistoryServiceFlinkJob {
     if (checkpointing) {
       this.env.enableCheckpointing(commitIntervalMs);
     }
+
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
   }
 
   private void buildPipeline() {
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
index 9ba56c828..e82610626 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
@@ -30,6 +30,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
index b8452847d..106826708 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
index a895c74d8..bc4e0b9d2 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
@@ -34,6 +34,8 @@ public final class ConfigurationKeys {
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index 0f26d3765..d69ee47d8 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
index 0db5a3d52..45c7ff1ad 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob {
       this.env.enableCheckpointing(commitIntervalMs);
     }
 
+    // Parallelism
+    final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
+    if (parallelism != null) {
+      LOGGER.error("Set parallelism: {}.", parallelism);
+      this.env.setParallelism(parallelism);
+    }
+
     // State Backend
     final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
     this.env.setStateBackend(stateBackend);
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
index 6497f6b05..448e8b095 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java
@@ -15,7 +15,7 @@ public final class ConfigurationKeys {
   public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
 
   public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
-  
+
   public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
 
   public static final String WINDOW_SIZE_MS = "window.size.ms";
@@ -28,13 +28,15 @@ public final class ConfigurationKeys {
 
   public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
 
-  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = //NOPMD
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
       "flink.state.backend.memory.size";
 
   public static final String DEBUG = "debug";
 
   public static final String CHECKPOINTING = "checkpointing";
 
+  public static final String PARALLELISM = "parallelism";
+
   private ConfigurationKeys() {}
 
 }
-- 
GitLab