From 43880122e3482530b8cb9b9aca06f2890ea3588b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Sat, 26 Nov 2022 18:22:15 +0100
Subject: [PATCH] More cleanup

---
 .../hazelcastjet/ConfigurationKeys.java        |  3 ++-
 .../resources/META-INF/application.properties  |  5 +----
 .../uc4/hazelcastjet/HistoryService.java       | 17 ++++++++---------
 .../uc4/hazelcastjet/Uc4PipelineFactory.java   |  8 ++++----
 .../uc4/hazelcastjet/Uc4PipelineTest.java      | 18 +++++++++++-------
 5 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
index 44e82126d..4c726f523 100644
--- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
+++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java
@@ -32,6 +32,7 @@ public class ConfigurationKeys {
   // UC4
   public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
   public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
-  public static final String WINDOW_SIZE_UC4 = "window.size";
+  public static final String EMIT_PERIOD_MS = "window.size";
+  // public static final String EMIT_PERIOD_MS = "emit.period.ms";
 
 }
diff --git a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties
index 547b92a3a..633b5084d 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties
@@ -6,13 +6,10 @@ kafka.input.topic=input
 kafka.output.topic=output
 kafka.configuration.topic=configuration
 kafka.feedback.topic=aggregation-feedback
-kafka.window.duration.minutes=1
 
 schema.registry.url=http://localhost:8081
 
-aggregation.duration.days=30
-aggregation.advance.days=1
-
+kafka.window.duration.minutes=1
 trigger.interval=15
 grace.period.ms=270
 
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
index f2b5cb41c..678f774a5 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java
@@ -2,6 +2,7 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import java.time.Duration;
 import java.util.Properties;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,24 +47,22 @@ public class HistoryService extends HazelcastJetService {
             StringSerializer.class.getCanonicalName(),
             KafkaAvroSerializer.class.getCanonicalName());
 
-    final String kafkaOutputTopic =
-        this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
 
-    final String kafkaConfigurationTopic =
-        this.config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
+    final String configurationTopic =
+        this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
 
-    final String kafkaFeedbackTopic =
-        this.config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
+    final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
 
-    final int windowSize = Integer.parseInt(
-        this.config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
+    final Duration windowSize = Duration.ofMillis(
+        this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS));
 
     this.pipelineFactory = new Uc4PipelineFactory(
         kafkaProps,
         kafkaConfigReadProps,
         kafkaAggregationReadProps,
         kafkaWriteProps,
-        this.kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
+        this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
         windowSize);
   }
 
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
index abac638ec..83e0edcb8 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
@@ -15,6 +15,7 @@ import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.StreamStageWithKey;
 import com.hazelcast.jet.pipeline.WindowDefinition;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,7 +43,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
   private final String kafkaConfigurationTopic;
   private final String kafkaFeedbackTopic;
 
-  private final int windowSize;
+  private final Duration windowSize;
 
 
   /**
@@ -70,7 +71,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
       final String kafkaOutputTopic,
       final String kafkaConfigurationTopic,
       final String kafkaFeedbackTopic,
-      final int windowSize) {
+      final Duration windowSize) {
 
     super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
         kafkaWritePropsForPipeline, kafkaOutputTopic);
@@ -230,8 +231,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
     // (5) UC4 Last Value Map
     // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
     final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues =
-        dupliAsFlatmappedStage
-            .window(WindowDefinition.tumbling(this.windowSize));
+        dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.windowSize.toMillis()));
 
     final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS
         AggregateOperation
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
index 5c06a2c3c..29a561d1b 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java
@@ -13,6 +13,7 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
 import com.hazelcast.jet.pipeline.test.Assertions;
 import com.hazelcast.jet.pipeline.test.TestSources;
 import com.hazelcast.jet.test.SerialTest;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -52,7 +53,8 @@ public class Uc4PipelineTest extends JetTestSupport {
     final String testLevel1GroupName = "TEST-LEVEL1-GROUP";
     final String testLevel2GroupName = "TEST-LEVEL2-GROUP";
     final Double testValueInW = 10.0;
-    final int testWindowSize = 5000; // As window size is bugged, not necessary.
+    // As window size is bugged, not necessary.
+    final Duration testWindowSize = Duration.ofMillis(5000);
 
     // Create mocked Hazelcast Jet instance with configuration
     final String testClusterName = randomName();
@@ -117,10 +119,11 @@ public class Uc4PipelineTest extends JetTestSupport {
     // Create pipeline to test
     final Properties properties = new Properties();
     final Uc4PipelineFactory factory = new Uc4PipelineFactory(
-        properties,properties,properties,properties,"","",
-    "","", testWindowSize);
+        properties, properties, properties, properties, "", "",
+        "", "", testWindowSize);
 
-    this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
+    this.uc4Topology =
+        factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
     this.uc4Topology.writeTo(Sinks.logger());
 
     this.testPipeline = factory.getPipe();
@@ -202,7 +205,8 @@ public class Uc4PipelineTest extends JetTestSupport {
           .registerSerializer(ImmutableSensorRegistry.class,
               ImmutableSensorRegistryUc4Serializer.class);
       this.testInstance.newJob(this.testPipeline, jobConfig).join();
-      Assert.fail("Job should have completed with an AssertionCompletedException, but completed normally");
+      Assert.fail(
+          "Job should have completed with an AssertionCompletedException, but completed normally");
 
     } catch (final CompletionException e) {
       final String errorMsg = e.getCause().getMessage();
@@ -210,8 +214,8 @@ public class Uc4PipelineTest extends JetTestSupport {
           "Job was expected to complete with AssertionCompletedException, but completed with: "
               + e.getCause(),
           errorMsg.contains(AssertionCompletedException.class.getName()));
-    } catch (final Exception e){
-      LOGGER.error("Test is broken",e);
+    } catch (final Exception e) {
+      LOGGER.error("Test is broken", e);
     }
   }
 
-- 
GitLab