diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index a98aa89ff17d4013255dad1bf2f47729116be230..d6d56b86528f68003626c9f0ea1603aa49e78169 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -29,8 +29,10 @@ public final class ConfigurationKeys { public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; // UC4 - public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String EMIT_PERIOD_SECONDS = "kafka.window.duration.minutes"; + // public static final String EMIT_PERIOD_MS = "emit.period.ms"; + public static final String GRACE_PERIOD_MS = "grace.period.ms"; // BEAM public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; @@ -41,7 +43,9 @@ public final class ConfigurationKeys { public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; - public static final String TRIGGER_INTERVAL = "trigger.interval"; + // Used for UC3 + UC4: + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; private ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index f6587be4c4660a2e34f34efdaa417a7080073d0e..3f99277d4972df15841bf567b80bfcf03f2221a4 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -48,7 +48,7 @@ public class PipelineFactory extends AbstractPipelineFactory { final Duration aggregationAdvanceDuration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Duration triggerDelay = - Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties index fdb700fd9daf7d5e8b5260fa6c9c8810bca39eb5..f4511d5f1f182647210f24459e3d2f1db84eea29 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-beam/src/main/resources/META-INF/application.properties @@ -4,7 +4,6 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8081 diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 6627235dc29a1fb59da876a0e3dd7f7d36865d1c..e065993b5ce9f233711dd26a3057f07793ee266f 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -73,9 +73,9 @@ public class PipelineFactory extends AbstractPipelineFactory { this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); final Duration duration = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS)); final Duration triggerDelay = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final Duration gracePeriod = Duration.standardSeconds( this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));