From 020839d3d76588bd36976793c200a03113b75e46 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 8 Dec 2022 18:21:59 +0100
Subject: [PATCH] Fix code quality issues

---
 .../hazelcastjet/ChildParentsTransformer.java |  2 -
 .../uc4/hazelcastjet/HistoryService.java      |  8 +++-
 .../hazelcastjet/Uc4ConfigurationKeys.java    |  3 +-
 .../uc4/hazelcastjet/Uc4PipelineFactory.java  | 37 +++++++++----------
 .../resources/META-INF/application.properties |  2 +-
 .../uc4/hazelcastjet/Uc4PipelineTest.java     |  2 +-
 6 files changed, 28 insertions(+), 26 deletions(-)

diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
index 47bd04a12..8e1f9f705 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
@@ -18,8 +18,6 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
  */
 public class ChildParentsTransformer {
 
-  public ChildParentsTransformer() {}
-
   /**
    * Constructs a map of keys to their set of parents out of a SensorRegistry.
    *
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
index 97ea33eda..1d1ac8969 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
@@ -53,16 +53,20 @@ public class HistoryService extends HazelcastJetService {
 
     final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
 
-    final Duration windowSize = Duration.ofMillis(
+    final Duration emirPeriod = Duration.ofMillis(
         this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
 
+    final Duration gracePeriod = Duration.ofMillis(
+        this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
+
     this.pipelineFactory = new Uc4PipelineFactory(
         kafkaProps,
         kafkaConfigReadProps,
         kafkaAggregationReadProps,
         kafkaWriteProps,
         this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
-        windowSize);
+        emirPeriod,
+        gracePeriod);
   }
 
   @Override
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
index 6c4c63396..7a85c2dc5 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java
@@ -12,6 +12,7 @@ public class Uc4ConfigurationKeys {
   public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
 
   public static final String EMIT_PERIOD_MS = "emit.period.ms";
-  // public static final String GRACE_PERIOD_MS = "grace.period.ms";
+
+  public static final String GRACE_PERIOD_MS = "grace.period.ms";
 
 }
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
index 59b5941fb..db7fdfce0 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
@@ -45,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
 
   private final Duration emitPeriod;
 
+  private final Duration gracePeriod;
+
 
   /**
    * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
@@ -61,7 +63,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
    * @param kafkaOutputTopic The name of the output topic used for the pipeline.
    * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline.
    * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline.
-   * @param windowSize The window size of the tumbling window used in this pipeline.
+   * @param emitPeriod The window size of the tumbling window used in this pipeline.
    */
   public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD
       final Properties kafkaConfigPropsForPipeline,
@@ -71,7 +73,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
       final String kafkaOutputTopic,
       final String kafkaConfigurationTopic,
       final String kafkaFeedbackTopic,
-      final Duration windowSize) {
+      final Duration emitPeriod,
+      final Duration gracePeriod) {
 
     super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
         kafkaWritePropsForPipeline, kafkaOutputTopic);
@@ -79,7 +82,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
     this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline;
     this.kafkaConfigurationTopic = kafkaConfigurationTopic;
     this.kafkaFeedbackTopic = kafkaFeedbackTopic;
-    this.emitPeriod = windowSize;
+    this.emitPeriod = emitPeriod;
+    this.gracePeriod = gracePeriod;
   }
 
   /**
@@ -155,13 +159,13 @@ public class Uc4PipelineFactory extends PipelineFactory {
     //////////////////////////////////
     // (1) Configuration Stream
     this.pipe.readFrom(configurationSource)
-        .withNativeTimestamps(0)
+        .withNativeTimestamps(this.gracePeriod.toMillis())
         .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED
             || entry.getKey() == Event.SENSOR_REGISTRY_STATUS)
         .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())))
         .flatMapStateful(HashMap::new, new ConfigFlatMap())
         .writeTo(Sinks.mapWithUpdating(
-            SENSOR_PARENT_MAP_NAME, // The addressed IMAP
+            SENSOR_PARENT_MAP_NAME, // The addressed IMap
             Entry::getKey, // The key to look for
             (oldValue, newEntry) -> newEntry.getValue()));
 
@@ -169,13 +173,13 @@ public class Uc4PipelineFactory extends PipelineFactory {
     // (1) Sensor Input Stream
     final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe
         .readFrom(inputSource)
-        .withNativeTimestamps(0);
+        .withNativeTimestamps(this.gracePeriod.toMillis());
 
     //////////////////////////////////
     // (1) Aggregation Stream
     final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe
         .readFrom(aggregationSource)
-        .withNativeTimestamps(0)
+        .withNativeTimestamps(this.gracePeriod.toMillis())
         .map(entry -> { // Map Aggregated to ActivePowerRecord
           final AggregatedActivePowerRecord agg = entry.getValue();
           final ActivePowerRecord record = new ActivePowerRecord(
@@ -214,17 +218,11 @@ public class Uc4PipelineFactory extends PipelineFactory {
           final ActivePowerRecord record = entry.getValue().getRecord();
           final Set<String> groups = entry.getValue().getGroups();
 
-          // Transformed Data
-          final String[] groupList = groups.toArray(String[]::new);
-          final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length];
-          final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>();
-          for (int i = 0; i < groupList.length; i++) {
-            newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
-            newEntryList.add(Util.entry(newKeyList[i], record));
-          }
-
           // Return traversable list of new entry elements
-          return Traversers.traverseIterable(newEntryList);
+          return Traversers.traverseStream(
+              groups
+                  .stream()
+                  .map(group -> Util.entry(new SensorGroupKey(keyGroupId, group), record)));
         });
 
     //////////////////////////////////
@@ -253,7 +251,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
 
     return windowedLastValues
         .groupingKey(entry -> entry.getKey().getGroup())
-        .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
+        .aggregate(aggrOp)
+        .map(agg -> Util.entry(agg.getKey(), agg.getValue()));
   }
 
 
@@ -270,7 +269,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
         final Map<String, Set<String>> flatMapStage,
         final Entry<Event, SensorRegistry> eventItem) {
       // Transform new Input
-      final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name");
+      final ChildParentsTransformer transformer = new ChildParentsTransformer();
       final Map<String, Set<String>> mapFromRegistry =
           transformer.constructChildParentsPairs(eventItem.getValue());
 
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
index af877044b..6fa35fa12 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties
@@ -10,4 +10,4 @@ kafka.feedback.topic=aggregation-feedback
 schema.registry.url=http://localhost:8081
 
 emit.period.ms=5000
-#grace.period.ms=0
+grace.period.ms=0
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
index 29a561d1b..7248791a5 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
@@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport {
     final Properties properties = new Properties();
     final Uc4PipelineFactory factory = new Uc4PipelineFactory(
         properties, properties, properties, properties, "", "",
-        "", "", testWindowSize);
+        "", "", testWindowSize, Duration.ofMillis(0));
 
     this.uc4Topology =
         factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
-- 
GitLab