diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java index 750f11ab50c5f22edf35c8dd1d0b07b739d7155a..396ca98675fb8ceae818fb9eeeec7b23d9b1aba6 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java @@ -19,10 +19,6 @@ public final class ConfigurationKeys { public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String WINDOW_SIZE_MS = "window.size.ms"; - - public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING_INTERVAL_MS = "checkpointing.interval.ms"; diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index 4c726f523efc2b8b7320941ff2fe0d3a1c1d7acf..5215c23c83f9b4e86b6e81d634f7fd6575977c4c 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -32,7 +32,7 @@ public class ConfigurationKeys { // UC4 public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - public static final String EMIT_PERIOD_MS = "window.size"; - // public static final String EMIT_PERIOD_MS = "emit.period.ms"; + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + // public static final String GRACE_PERIOD_MS = "grace.period.ms"; } diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 5f4515cb851439841d1de3193f21275545033481..3310651241697505508f0b4b536edc19e5de5256 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -67,9 +67,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowSize = - Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS)); + Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = - Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS)); + Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)); final String configurationTopic = this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java index 6c53a78520bcb5318902adcf824fe2b80ad7b0f5..6016865fb094d628c8703a7300fbfb83f46afc12 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java @@ -19,9 +19,9 @@ public final class ConfigurationKeys { public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String WINDOW_SIZE_MS = "window.size.ms"; + public static final String EMIT_PERIOD_MS = "emit.period.ms"; - public static final String WINDOW_GRACE_MS = "window.grace.ms"; + public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String FLINK_STATE_BACKEND = "flink.state.backend"; diff --git a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties index f48b6c6ef1ae747474da54cc43e0c88207f7464e..7726ce651d51ed129ae4c87c9d415b8c7e30c7b7 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties @@ -9,8 +9,9 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output schema.registry.url=http://localhost:8081 -window.size.ms=1000 -window.grace.ms=0 + +emit.period.ms=5000 +grace.period.ms=0 # Flink configuration checkpointing.interval.ms=1000 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index 83e0edcb8f6a3fba4559aa1bc5c17ac4d57e1312..e0ba823596474f87c79d488a5bbe7e71749f54fb 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -43,7 +43,7 @@ public class Uc4PipelineFactory extends PipelineFactory { private final String kafkaConfigurationTopic; private final String kafkaFeedbackTopic; - private final Duration windowSize; + private final Duration emitPeriod; /** @@ -79,7 +79,7 @@ public class Uc4PipelineFactory extends PipelineFactory { this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaConfigurationTopic = kafkaConfigurationTopic; this.kafkaFeedbackTopic = kafkaFeedbackTopic; - this.windowSize = windowSize; + this.emitPeriod = windowSize; } /** @@ -231,7 +231,7 @@ public class Uc4PipelineFactory extends PipelineFactory { // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues = - dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.windowSize.toMillis())); + dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis())); final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS AggregateOperation diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties index 5b175ab6a9379f767d4b462aa25fd989de01d9a4..af877044b6e17665b6a18af41ec72ab6cedf0f91 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties @@ -9,6 +9,5 @@ kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 -window.size=5000 -#emit.period.ms=5000 +emit.period.ms=5000 #grace.period.ms=0