From 409d4ab5554d8cf8d74c131c36ce33bf6140eaac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 26 Nov 2022 17:22:35 +0100 Subject: [PATCH] Keep Beam UC4 working --- .../benchmarks/commons/beam/ConfigurationKeys.java | 8 ++++++-- .../theodolite/benchmarks/uc3/beam/PipelineFactory.java | 2 +- .../src/main/resources/META-INF/application.properties | 1 - .../theodolite/benchmarks/uc4/beam/PipelineFactory.java | 4 ++-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index a98aa89ff..d6d56b865 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -29,8 +29,10 @@ public final class ConfigurationKeys { public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; // UC4 - public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String EMIT_PERIOD_SECONDS = "kafka.window.duration.minutes"; + // public static final String EMIT_PERIOD_MS = "emit.period.ms"; + public static final String GRACE_PERIOD_MS = "grace.period.ms"; // BEAM public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; @@ -41,7 +43,9 @@ public final class ConfigurationKeys { public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; - public static final String TRIGGER_INTERVAL = "trigger.interval"; + // Used for UC3 + UC4: + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index f6587be4c..3f99277d4 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -48,7 +48,7 @@ public class PipelineFactory extends AbstractPipelineFactory { final Duration aggregationAdvanceDuration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Duration triggerDelay = - Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties index fdb700fd9..f4511d5f1 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties @@ -4,7 +4,6 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8081 diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 6627235dc..e065993b5 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -73,9 +73,9 @@ public class PipelineFactory extends AbstractPipelineFactory { this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); final Duration duration = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS)); final Duration triggerDelay = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final Duration gracePeriod = Duration.standardSeconds( this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); -- GitLab