From 7abfdeb5b9a3ba96a4f84123404c74a2e0941075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 26 Nov 2022 19:19:10 +0100 Subject: [PATCH] Unify naming of UC4 options --- .../benchmarks/commons/flink/ConfigurationKeys.java | 4 ---- .../benchmarks/commons/hazelcastjet/ConfigurationKeys.java | 4 ++-- .../benchmarks/uc4/flink/AggregationServiceFlinkJob.java | 4 ++-- .../theodolite/benchmarks/uc4/flink/ConfigurationKeys.java | 4 ++-- .../src/main/resources/META-INF/application.properties | 5 +++-- .../benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java | 6 +++--- .../src/main/resources/META-INF/application.properties | 3 +-- 7 files changed, 13 insertions(+), 17 deletions(-) diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java index 750f11ab5..396ca9867 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java @@ -19,10 +19,6 @@ public final class ConfigurationKeys { public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String WINDOW_SIZE_MS = "window.size.ms"; - - public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING_INTERVAL_MS = "checkpointing.interval.ms"; diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index 4c726f523..5215c23c8 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -32,7 +32,7 @@ public class ConfigurationKeys { // UC4 public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - public static final String EMIT_PERIOD_MS = "window.size"; - // public static final String EMIT_PERIOD_MS = "emit.period.ms"; + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + // public static final String GRACE_PERIOD_MS = "grace.period.ms"; } diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 5f4515cb8..331065124 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -67,9 +67,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowSize = - Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); + Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = - Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); + Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)); final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java index 6c53a7852..6016865fb 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java @@ -19,9 +19,9 @@ public final class ConfigurationKeys { public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String WINDOW_SIZE_MS = "window.size.ms"; + public static final String EMIT_PERIOD_MS = "emit.period.ms"; - public static final String WINDOW_GRACE_MS = "window.grace.ms"; + public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String FLINK_STATE_BACKEND = "flink.state.backend"; diff --git a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties index f48b6c6ef..7726ce651 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties @@ -9,8 +9,9 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output schema.registry.url=http://localhost:8081 -window.size.ms=1000 -window.grace.ms=0 + +emit.period.ms=5000 +grace.period.ms=0 # Flink configuration checkpointing.interval.ms=1000 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index 83e0edcb8..e0ba82359 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -43,7 +43,7 @@ public class Uc4PipelineFactory extends PipelineFactory { private final String kafkaConfigurationTopic; private final String kafkaFeedbackTopic; - private final Duration windowSize; + private final Duration emitPeriod; /** @@ -79,7 +79,7 @@ public class Uc4PipelineFactory extends PipelineFactory { this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaConfigurationTopic = kafkaConfigurationTopic; this.kafkaFeedbackTopic = kafkaFeedbackTopic; - this.windowSize = windowSize; + this.emitPeriod = windowSize; } /** @@ -231,7 +231,7 @@ public class Uc4PipelineFactory extends PipelineFactory { // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues = - dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.windowSize.toMillis())); + dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis())); final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS AggregateOperation diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties index 5b175ab6a..af877044b 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties @@ -9,6 +9,5 @@ kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 -window.size=5000 -#emit.period.ms=5000 +emit.period.ms=5000 #grace.period.ms=0 -- GitLab