diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java index 0f314f1497708f73c7bc00337438f7a53d081731..f34f7014135833aef4022ac1e07dce2271e4faf7 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java @@ -15,8 +15,6 @@ public final class Uc4ConfigurationKeys { public static final String GRACE_PERIOD_MS = "grace.period.ms"; - // public static final String TRIGGER_ENABLE = "trigger.enable"; - public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; private Uc4ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index f893c0b40e77af4f743e96f35693159379658868..7d424608215c19b6478f5a90fe3d71b0ad095519 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -10,6 +10,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; @@ -72,6 +73,8 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); + final Time triggerDuration = + Time.seconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final String configurationTopic = this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); @@ -147,6 +150,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace)) .keyBy(t -> t.f0.getParent()) .window(TumblingEventTimeWindows.of(windowSize)) + .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration)) .process(new RecordAggregationProcessWindowFunction()) .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord"); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java index 6fd2b0fa0ec50febd213fb3f7d24463d2bd6f51c..e910cd2078e8a18112fdd509272d99331082ae4f 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java @@ -17,6 +17,8 @@ public final class Uc4ConfigurationKeys { public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + private Uc4ConfigurationKeys() {} }