From 4601f11c68d94dd977c9905c0b18fa7119eb0e51 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 16 Jun 2022 16:37:06 +0200
Subject: [PATCH] Uc2 hazelcastjet: change window time to Duration

---
 .../benchmarks/uc2/hazelcastjet/HistoryService.java  | 11 ++++++-----
 .../uc2/hazelcastjet/Uc2PipelineFactory.java         | 12 +++++++-----
 .../benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java | 11 ++++++-----
 3 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
index 4c097c578..7fbcc1cfe 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
@@ -2,6 +2,8 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 
 import com.google.common.math.StatsAccumulator;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+
+import java.time.Duration;
 import java.util.Properties;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -39,17 +41,16 @@ public class HistoryService extends HazelcastJetService {
     final String kafkaOutputTopic =
         config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
 
-    // Transform minutes to milliseconds
-    final int downsampleInterval = Integer.parseInt(
-        config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString());
-    final int downsampleIntervalMs = downsampleInterval * 60_000;
+    final Duration downsampleInterval = Duration.ofMinutes(
+        Integer.parseInt(config.getProperty(
+            ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()));
 
     this.pipelineFactory = new Uc2PipelineFactory(
         kafkaProps,
         this.kafkaInputTopic,
         kafkaWriteProps,
         kafkaOutputTopic,
-        downsampleIntervalMs);
+        downsampleInterval);
   }
 
   @Override
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
index b4c8fa662..cc3ed394f 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
@@ -11,17 +11,19 @@ import com.hazelcast.jet.pipeline.Sinks;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.WindowDefinition;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
 import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
+import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
+
 
 
 public class Uc2PipelineFactory extends PipelineFactory {
 
-  private final int downsampleIntervalInMs;
+  private final Duration downsampleInterval;
 
   /**
    * Factory for uc2 pipelines.
@@ -38,10 +40,10 @@ public class Uc2PipelineFactory extends PipelineFactory {
                                final String kafkaInputTopic,
                                final Properties kafkaWritePropsForPipeline,
                                final String kafkaOutputTopic,
-                               final int downsampleIntervalInMs) {
+                               final Duration downsampleIntervalInMs) {
     super(kafkaReadPropsForPipeline, kafkaInputTopic,
         kafkaWritePropsForPipeline,kafkaOutputTopic);
-    this.downsampleIntervalInMs = downsampleIntervalInMs;
+    this.downsampleInterval = downsampleIntervalInMs;
   }
 
   /**
@@ -91,7 +93,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
         .withNativeTimestamps(0)
         .setLocalParallelism(1)
         .groupingKey(record -> record.getValue().getIdentifier())
-        .window(WindowDefinition.tumbling(downsampleIntervalInMs))
+        .window(WindowDefinition.tumbling(downsampleInterval.toMillis()))
         .aggregate(this.uc2AggregateOperation())
         .map(agg -> {
           final String theKey = agg.key();
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
index 00e3019bf..8461d9ceb 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
@@ -12,6 +12,8 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
 import com.hazelcast.jet.pipeline.test.Assertions;
 import com.hazelcast.jet.pipeline.test.TestSources;
 import com.hazelcast.jet.test.SerialTest;
+
+import java.time.Duration;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -44,7 +46,7 @@ public class Uc2PipelineTest extends JetTestSupport {
     final int testItemsPerSecond = 1;
     final String testSensorName = "TEST-SENSOR";
     final Double testValueInW = 10.0;
-    final int testWindowInMs = 5000;
+    final Duration testWindow = Duration.ofSeconds(5);
 
     // Create mock jet instance with configuration
     final String testClusterName = randomName();
@@ -65,7 +67,7 @@ public class Uc2PipelineTest extends JetTestSupport {
     // Create pipeline to test
     final Properties properties = new Properties();
     final Uc2PipelineFactory factory = new Uc2PipelineFactory(
-        properties,"",properties,"", testWindowInMs);
+        properties,"",properties,"", testWindow);
 
     this.uc2Topology = factory.extendUc2Topology(testSource);
     this.testPipeline = factory.getPipe();
@@ -84,9 +86,8 @@ public class Uc2PipelineTest extends JetTestSupport {
 
     // Assertion
     this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout,
-        collection -> Assert.assertTrue(
-            "Not the right amount items in Stats Object!",
-            collection.get(collection.size() - 1).getValue().equals(expectedOutput))));
+        collection -> Assert.assertEquals("Not the right amount items in Stats Object!",
+            expectedOutput, collection.get(collection.size() - 1).getValue())));
 
     // Run the test!
     try {
-- 
GitLab