diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java index ed961bab733a409dc07b1be7fa35562103c3e2f4..382525cfe75f82dbbe8fbcc85308b0e7788a43bc 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java @@ -19,6 +19,8 @@ public final class ConfigurationKeys { public static final String CHECKPOINTING = "checkpointing"; + public static final String PARALLELISM = "parallelism"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java index 0b732d5b4a8393275b5740da088aae5fa462b95f..8d9832e40253fe9e3178bfc25047ed2b376abe76 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java @@ -43,6 +43,14 @@ public final class HistoryServiceFlinkJob { if (checkpointing) { this.env.enableCheckpointing(commitIntervalMs); } + + // Parallelism + final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); + if (parallelism != null) { + LOGGER.error("Set parallelism: {}.", parallelism); + this.env.setParallelism(parallelism); + } + } private void buildPipeline() { diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java index 9ba56c828a0ae5c6147aadd90d449c7cf2324992..e8261062689ce4c586a4e6fbde02878a28f48e97 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java @@ -30,6 +30,8 @@ public final class ConfigurationKeys { public static final String CHECKPOINTING = "checkpointing"; + public static final String PARALLELISM = "parallelism"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java index b8452847df800226ad481f9309323a2a9a532939..1068267086892c4538001b6afc670b3b0cd043ef 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java @@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob { this.env.enableCheckpointing(commitIntervalMs); } + // Parallelism + final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); + if (parallelism != null) { + LOGGER.error("Set parallelism: {}.", parallelism); + this.env.setParallelism(parallelism); + } + // State Backend final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); this.env.setStateBackend(stateBackend); diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java index a895c74d89c5d788c47b3b78dc70500b4b5a6f5b..bc4e0b9d2d230026e9d2b6df0a11e4fb68380aed 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java @@ -34,6 +34,8 @@ public final class ConfigurationKeys { public static final String CHECKPOINTING = "checkpointing"; + public static final String PARALLELISM = "parallelism"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java index 0f26d37652924a16be1840fd759b3cd5b023f338..d69ee47d8c831f2e5e74abdd8c33393c8ee6e07e 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java @@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob { this.env.enableCheckpointing(commitIntervalMs); } + // Parallelism + final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); + if (parallelism != null) { + LOGGER.error("Set parallelism: {}.", parallelism); + this.env.setParallelism(parallelism); + } + // State Backend final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); this.env.setStateBackend(stateBackend); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java index 0db5a3d524f74fbf22304e8f9b44fa55eead321a..45c7ff1ad1faeec6357e4ac3871dec7a51306698 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java @@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob { this.env.enableCheckpointing(commitIntervalMs); } + // Parallelism + final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); + if (parallelism != null) { + LOGGER.error("Set parallelism: {}.", parallelism); + this.env.setParallelism(parallelism); + } + // State Backend final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); this.env.setStateBackend(stateBackend); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java index 6497f6b055ef115c4a681499c5fa38657bb5d29e..448e8b095ef15c434655ca3c76a9e2de21244054 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/theodolite/uc4/application/ConfigurationKeys.java @@ -15,7 +15,7 @@ public final class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String WINDOW_SIZE_MS = "window.size.ms"; @@ -28,13 +28,15 @@ public final class ConfigurationKeys { public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = //NOPMD + public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD "flink.state.backend.memory.size"; public static final String DEBUG = "debug"; public static final String CHECKPOINTING = "checkpointing"; + public static final String PARALLELISM = "parallelism"; + private ConfigurationKeys() {} }