From 05c84d2bc09c88c35014e05fe1e9b9a6745819f1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 8 Dec 2022 19:03:23 +0100
Subject: [PATCH] Align Hazelcast UC4 intervals with others

additionally, do some minor code cleanup
---
 .../uc4/hazelcastjet/HashMapSupplier.java     | 24 -------------------
 .../uc4/hazelcastjet/HistoryService.java      |  6 ++++-
 .../hazelcastjet/Uc4ConfigurationKeys.java    |  2 ++
 .../uc4/hazelcastjet/Uc4PipelineFactory.java  | 17 ++++++-------
 .../resources/META-INF/application.properties |  3 ++-
 .../uc4/hazelcastjet/Uc4PipelineTest.java     |  2 +-
 6 files changed, 19 insertions(+), 35 deletions(-)
 delete mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java

diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java
deleted file mode 100644
index 61910850b..000000000
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package rocks.theodolite.benchmarks.uc4.hazelcastjet;
-
-import com.hazelcast.function.SupplierEx;
-import java.util.HashMap;
-import java.util.Set;
-
-/**
- * Supplies a {@link HashMap} and implements {@link SupplierEx}.
- */
-public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> {
-
-  private static final long serialVersionUID = -6247504592403610702L; // NOPMD
-
-  @Override
-  public HashMap<String, Set<String>> get() {
-    return new HashMap<>();
-  }
-
-  @Override
-  public HashMap<String, Set<String>> getEx() throws Exception {
-    return this.get();
-  }
-
-}
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
index 1d1ac8969..5790d5e3d 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
@@ -59,6 +59,9 @@ public class HistoryService extends HazelcastJetService {
     final Duration gracePeriod = Duration.ofMillis(
         this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
 
+    final Duration triggerPeriod = Duration.ofSeconds(
+        this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
+
     this.pipelineFactory = new Uc4PipelineFactory(
         kafkaProps,
         kafkaConfigReadProps,
@@ -66,7 +69,8 @@ public class HistoryService extends HazelcastJetService {
         kafkaWriteProps,
         this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
         emirPeriod,
-        gracePeriod);
+        gracePeriod,
+        triggerPeriod);
   }
 
   @Override
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
index 7a85c2dc5..3eedfc0bc 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
@@ -15,4 +15,6 @@ public class Uc4ConfigurationKeys {
 
   public static final String GRACE_PERIOD_MS = "grace.period.ms";
 
+  public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
+
 }
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
index db7fdfce0..d4a7657b9 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
@@ -16,9 +16,7 @@ import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.StreamStageWithKey;
 import com.hazelcast.jet.pipeline.WindowDefinition;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -47,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
 
   private final Duration gracePeriod;
 
+  private final Duration triggerPeriod;
+
 
   /**
    * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
@@ -74,7 +74,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
       final String kafkaConfigurationTopic,
       final String kafkaFeedbackTopic,
       final Duration emitPeriod,
-      final Duration gracePeriod) {
+      final Duration gracePeriod,
+      final Duration triggerPeriod) {
 
     super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
         kafkaWritePropsForPipeline, kafkaOutputTopic);
@@ -84,6 +85,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
     this.kafkaFeedbackTopic = kafkaFeedbackTopic;
     this.emitPeriod = emitPeriod;
     this.gracePeriod = gracePeriod;
+    this.triggerPeriod = triggerPeriod;
   }
 
   /**
@@ -229,7 +231,9 @@ public class Uc4PipelineFactory extends PipelineFactory {
     // (5) UC4 Last Value Map
     // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
     final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues =
-        dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis()));
+        dupliAsFlatmappedStage.window(WindowDefinition
+            .tumbling(this.emitPeriod.toMillis())
+            .setEarlyResultsPeriod(this.triggerPeriod.toMillis()));
 
     final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS
         AggregateOperation
@@ -285,11 +289,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
         }
       }
 
-      // Create a updates list to pass onto the next pipeline stage-
-      final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet());
-
       // Return traverser with updates list.
-      return Traversers.traverseIterable(updatesList)
+      return Traversers.traverseIterable(updates.entrySet())
           .map(e -> Util.entry(e.getKey(), e.getValue()));
     }
 
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
index 6fa35fa12..af55655bb 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
@@ -10,4 +10,5 @@ kafka.feedback.topic=aggregation-feedback
 schema.registry.url=http://localhost:8081
 
 emit.period.ms=5000
-grace.period.ms=0
+trigger.interval.seconds=1
+grace.period.ms=5000
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
index 7248791a5..44646d908 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
@@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport {
     final Properties properties = new Properties();
     final Uc4PipelineFactory factory = new Uc4PipelineFactory(
         properties, properties, properties, properties, "", "",
-        "", "", testWindowSize, Duration.ofMillis(0));
+        "", "", testWindowSize, Duration.ofSeconds(1), Duration.ofMillis(0));
 
     this.uc4Topology =
         factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
-- 
GitLab