From cda97d49c98058393014fa4f7365c6b4d7d34376 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 25 Nov 2022 16:53:23 +0100
Subject: [PATCH] Add early firing to Hazelcast Jet UC3

---
 .../hazelcastjet/ConfigurationKeys.java       |  3 +-
 .../uc3/hazelcastjet/HistoryService.java      | 21 +++---
 .../{uc3specifics => }/HourOfDayKey.java      |  2 +-
 .../HourOfDayKeySerializer.java               |  2 +-
 .../HoursOfDayKeyFactory.java                 |  2 +-
 .../{uc3specifics => }/StatsKeyFactory.java   |  2 +-
 .../uc3/hazelcastjet/Uc3PipelineFactory.java  | 69 ++++++++++---------
 .../resources/META-INF/application.properties |  1 +
 8 files changed, 54 insertions(+), 48 deletions(-)
 rename theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/{uc3specifics => }/HourOfDayKey.java (93%)
 rename theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/{uc3specifics => }/HourOfDayKeySerializer.java (92%)
 rename theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/{uc3specifics => }/HoursOfDayKeyFactory.java (87%)
 rename theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/{uc3specifics => }/StatsKeyFactory.java (82%)

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
index ce2fffc75..36ce6ded6 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
@@ -26,10 +26,11 @@ public class ConfigurationKeys {
   // UC3
   public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
   public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
+  public static final String AGGREGATION_EMIT_PERIOD_SECONDS = "aggregation.emit.period.seconds";
 
   // UC4
   public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
   public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
   public static final String WINDOW_SIZE_UC4 = "window.size";
-  
+
 }
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
index b2ba886a6..ff93b8660 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
@@ -1,7 +1,6 @@
 package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-
 import java.time.Duration;
 import java.util.Properties;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -10,8 +9,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
 
 /**
  * A microservice that aggregate incoming messages in a sliding window.
@@ -21,8 +18,8 @@ public class HistoryService extends HazelcastJetService {
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
 
   /**
-   * Constructs the use case logic for UC3.
-   * Retrieves the needed values and instantiates a pipeline factory.
+   * Constructs the use case logic for UC3. Retrieves the needed values and instantiates a pipeline
+   * factory.
    */
   public HistoryService() {
     super(LOGGER);
@@ -37,21 +34,25 @@ public class HistoryService extends HazelcastJetService {
             StringSerializer.class.getCanonicalName());
 
     final String kafkaOutputTopic =
-        config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+        this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
 
     final Duration windowSize = Duration.ofDays(Integer.parseInt(
-        config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
+        this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
 
     final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
-        config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
+        this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
+
+    final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt(
+        this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString()));
 
     this.pipelineFactory = new Uc3PipelineFactory(
         kafkaProps,
-        kafkaInputTopic,
+        this.kafkaInputTopic,
         kafkaWriteProps,
         kafkaOutputTopic,
         windowSize,
-        hoppingSize);
+        hoppingSize,
+        emitPeriod);
   }
 
   @Override
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
similarity index 93%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
index c69f433f3..6fe3343ce 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKey.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKey.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
 import java.util.Objects;
 
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
similarity index 92%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
index 91ba3f2be..c689bab16 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HourOfDayKeySerializer.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeySerializer.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
similarity index 87%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
index 4eddb85ef..af32575e5 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/HoursOfDayKeyFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
 import java.time.LocalDateTime;
 
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
similarity index 82%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
index 2a404781e..b731ee275 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/uc3specifics/StatsKeyFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsKeyFactory.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
 import java.time.LocalDateTime;
 
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
index 474a9e857..69fe67756 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
@@ -8,7 +8,6 @@ import com.hazelcast.jet.pipeline.Sinks;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.WindowDefinition;
-
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -18,51 +17,52 @@ import java.util.Properties;
 import java.util.TimeZone;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
 import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
-import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
 
 
 /**
- * PipelineFactory for use case 3.
- * Allows to build and extend pipelines.
+ * PipelineFactory for use case 3. Allows to build and extend pipelines.
  */
 public class Uc3PipelineFactory extends PipelineFactory {
 
   private final Duration hoppingSize;
   private final Duration windowSize;
+  private final Duration emitPeriod;
 
   /**
    * Build a new Pipeline.
+   *
    * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
    *        attributes.
    * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
    *        attributes.
    * @param kafkaInputTopic The name of the input topic used for the pipeline.
    * @param kafkaOutputTopic The name of the output topic used for the pipeline.
-   * @param hoppingSize The hop length of the sliding window used in the aggregation of
-   *        this pipeline.
-   * @param windowSize The window length of the sliding window used in the aggregation of
-   *        this pipeline.
+   * @param hoppingSize The hop length of the sliding window used in the aggregation of this
+   *        pipeline.
+   * @param windowSize The window length of the sliding window used in the aggregation of this
+   *        pipeline.
    */
   public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
-                            final String kafkaInputTopic,
-                            final Properties kafkaWritePropsForPipeline,
-                            final String kafkaOutputTopic,
-                            final Duration windowSize,
-                            final Duration hoppingSize) {
-    super(kafkaReadPropsForPipeline, kafkaInputTopic,
-        kafkaWritePropsForPipeline,kafkaOutputTopic);
+      final String kafkaInputTopic,
+      final Properties kafkaWritePropsForPipeline,
+      final String kafkaOutputTopic,
+      final Duration windowSize,
+      final Duration hoppingSize,
+      final Duration emitPeriod) {
+    super(
+        kafkaReadPropsForPipeline,
+        kafkaInputTopic,
+        kafkaWritePropsForPipeline,
+        kafkaOutputTopic);
     this.windowSize = windowSize;
     this.hoppingSize = hoppingSize;
+    this.emitPeriod = emitPeriod;
   }
 
-
-
   /**
    * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
-   * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data
-   *         for UC3.
+   *
+   * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data for UC3.
    */
   @Override
   public Pipeline buildPipeline() {
@@ -70,7 +70,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
     // Define the source
     final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
         .<String, ActivePowerRecord>kafka(
-            kafkaReadPropsForPipeline, kafkaInputTopic);
+            this.kafkaReadPropsForPipeline, this.kafkaInputTopic);
 
     // Extend topology for UC3
     final StreamStage<Map.Entry<String, String>> uc3Product =
@@ -80,9 +80,9 @@ public class Uc3PipelineFactory extends PipelineFactory {
     uc3Product.writeTo(Sinks.logger());
     // Add Sink2: Write back to kafka for the final benchmark
     uc3Product.writeTo(KafkaSinks.<String, String>kafka(
-        kafkaWritePropsForPipeline, kafkaOutputTopic));
+        this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
 
-    return pipe;
+    return this.pipe;
   }
 
   /**
@@ -98,11 +98,11 @@ public class Uc3PipelineFactory extends PipelineFactory {
    *         and value of the Entry object. It can be used to be further modified or directly be
    *         written into a sink.
    */
-  public StreamStage<Map.Entry<String, String>>
-      extendUc3Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+  public StreamStage<Map.Entry<String, String>> extendUc3Topology(
+      final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
 
     // Build the pipeline topology.
-    return pipe
+    return this.pipe
         .readFrom(source)
         // use Timestamps
         .withNativeTimestamps(0)
@@ -112,7 +112,8 @@ public class Uc3PipelineFactory extends PipelineFactory {
         .map(record -> {
           final String sensorId = record.getValue().getIdentifier();
           final long timestamp = record.getValue().getTimestamp();
-          final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
+          final LocalDateTime dateTime = LocalDateTime.ofInstant(
+              Instant.ofEpochMilli(timestamp),
               TimeZone.getDefault().toZoneId());
 
           final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
@@ -123,15 +124,17 @@ public class Uc3PipelineFactory extends PipelineFactory {
         // group by new keys
         .groupingKey(Entry::getKey)
         // Sliding/Hopping Window
-        .window(WindowDefinition.sliding(windowSize.toMillis(), hoppingSize.toMillis()))
+        .window(WindowDefinition
+            .sliding(this.windowSize.toMillis(), this.hoppingSize.toMillis())
+            .setEarlyResultsPeriod(this.emitPeriod.toMillis()))
         // get average value of group (sensoreId,hourOfDay)
         .aggregate(
             AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
-        // map to return pair (sensorID,hourOfDay) -> (averaged what value)
+        // map to return pair sensorID -> stats
         .map(agg -> {
-          final String theValue = agg.getValue().toString();
-          final String theKey = agg.getKey().toString();
-          return Map.entry(theKey, theValue);
+          final String sensorId = agg.getKey().getSensorId();
+          final String stats = agg.getValue().toString(); // TODO just double, not stats
+          return Map.entry(sensorId, stats);
         });
   }
 }
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
index 0e7d3f42f..53e559dc3 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/resources/META-INF/application.properties
@@ -6,6 +6,7 @@ kafka.input.topic=input
 kafka.output.topic=output
 aggregation.duration.days=30
 aggregation.advance.days=1
+aggregation.emit.period.seconds=15
 
 schema.registry.url=http://localhost:8081
 
-- 
GitLab