From 092cc3c08a08d6c94a3f37f70fcd38a8e9692143 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 25 Nov 2022 13:56:02 +0100
Subject: [PATCH] Align Flink UC3 implementation with others

* Remove logging statement
* Add trigger (default 15s)
---
 theodolite-benchmarks/uc3-flink/Dockerfile         |  2 +-
 .../benchmarks/uc3/flink/ConfigurationKeys.java    |  3 +++
 .../uc3/flink/HistoryServiceFlinkJob.java          | 14 +++++++++-----
 .../main/resources/META-INF/application.properties |  1 +
 4 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/theodolite-benchmarks/uc3-flink/Dockerfile b/theodolite-benchmarks/uc3-flink/Dockerfile
index cef05c029..744ad389e 100644
--- a/theodolite-benchmarks/uc3-flink/Dockerfile
+++ b/theodolite-benchmarks/uc3-flink/Dockerfile
@@ -1,3 +1,3 @@
 FROM flink:1.13-java11
 
-ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar
\ No newline at end of file
+ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java
index 980f07b9b..9e42f1ffb 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java
@@ -21,6 +21,9 @@ public final class ConfigurationKeys {
 
   public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
 
+  public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS =
+      "aggregation.trigger.interval.seconds";
+
   public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
 
   public static final String TIME_ZONE = "time.zone";
diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
index d80f64faf..9a85a87a8 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java
@@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
         Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
     final Time aggregationAdvance =
         Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
+    final Time triggerDuration =
+        Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
     final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
@@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
           return keyFactory.createKey(record.getIdentifier(), dateTime);
         })
         .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
+        .trigger(ContinuousEventTimeTrigger.of(triggerDuration))
         .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
         .map(tuple -> {
-          final String newKey = keyFactory.getSensorId(tuple.f0);
-          final String newValue = tuple.f1.toString();
-          final int hourOfDay = tuple.f0.getHourOfDay();
-          LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
-          return new Tuple2<>(newKey, newValue);
+          final String sensorId = keyFactory.getSensorId(tuple.f0);
+          final String stats = tuple.f1.toString();
+          // final int hourOfDay = tuple.f0.getHourOfDay();
+          // LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
+          return new Tuple2<>(sensorId, stats);
         })
         .name("map")
         .returns(Types.TUPLE(Types.STRING, Types.STRING))
diff --git a/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties
index 6b6874674..4423cd38c 100644
--- a/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties
@@ -7,6 +7,7 @@ kafka.output.topic=output
 schema.registry.url=http://localhost:8081
 aggregation.duration.days=30
 aggregation.advance.days=1
+aggregation.trigger.interval.seconds=15
 num.threads=1
 commit.interval.ms=100
 cache.max.bytes.buffering=-1
-- 
GitLab