From 6ecc7eeaf608a0a2555265807b739f9968f49b98 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 8 Dec 2022 18:52:39 +0100
Subject: [PATCH] Set trigger interval in Flink UC4

---
 .../theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java  | 2 --
 .../benchmarks/uc4/flink/AggregationServiceFlinkJob.java      | 4 ++++
 .../theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java | 2 ++
 3 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java
index 0f314f149..f34f70141 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java
@@ -15,8 +15,6 @@ public final class Uc4ConfigurationKeys {
 
   public static final String GRACE_PERIOD_MS = "grace.period.ms";
 
-  // public static final String TRIGGER_ENABLE = "trigger.enable";
-
   public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
 
   private Uc4ConfigurationKeys() {}
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
index f893c0b40..7d4246082 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
@@ -10,6 +10,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -72,6 +73,8 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
         Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
     final Duration windowGrace =
         Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
+    final Time triggerDuration =
+        Time.seconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
     final String configurationTopic =
         this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
@@ -147,6 +150,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
         .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
         .keyBy(t -> t.f0.getParent())
         .window(TumblingEventTimeWindows.of(windowSize))
+        .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration))
         .process(new RecordAggregationProcessWindowFunction())
         .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord");
 
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java
index 6fd2b0fa0..e910cd207 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java
@@ -17,6 +17,8 @@ public final class Uc4ConfigurationKeys {
 
   public static final String GRACE_PERIOD_MS = "grace.period.ms";
 
+  public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
+
   private Uc4ConfigurationKeys() {}
 
 }
-- 
GitLab