From 1e1c0b157f1d0fc6cb114ad976534efdbaa4a2bc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 25 Nov 2022 15:45:45 +0100
Subject: [PATCH] Cleanup Hazelcast Jet implementations

---
 .../uc1/hazelcast/Uc1PipelineTest.java        | 11 ++-
 .../uc2/hazelcastjet/HistoryService.java      |  1 -
 .../StatsAccumulatorSerializer.java           |  2 +-
 .../StatsAccumulatorSupplier.java             |  2 +-
 .../hazelcastjet/StatsAggregatorFactory.java  | 48 +++++++++++++
 .../uc2/hazelcastjet/Uc2PipelineFactory.java  | 70 +++++--------------
 .../uc2/hazelcastjet/Uc2PipelineTest.java     |  4 +-
 7 files changed, 71 insertions(+), 67 deletions(-)
 rename theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/{uc2specifics => }/StatsAccumulatorSerializer.java (94%)
 rename theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/{uc2specifics => }/StatsAccumulatorSupplier.java (88%)
 create mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java

diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
index d90f2b1ed..3cb30bf23 100644
--- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
+++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java
@@ -32,9 +32,6 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
 import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory;
 
-import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
-import static com.hazelcast.logging.Logger.getLogger;
-
 /**
  * Test methods for the Hazelcast Jet Implementation of UC1.
  */
@@ -88,16 +85,16 @@ public class Uc1PipelineTest extends JetTestSupport {
 
     // Create pipeline to test
     final Properties properties = new Properties();
-    final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties,"");
-    uc1Topology = factory.extendUc1Topology(testSource);
+    final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties, "");
+    this.uc1Topology = factory.extendUc1Topology(testSource);
     this.testPipeline = factory.getPipe();
 
     // Create DatabaseWriter sink
     final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter();
     final Sink<String> sink = sinkBuilder(
         "database-sink", x -> adapter)
-        .<String>receiveFn(DatabaseWriter::write)
-        .build();
+            .<String>receiveFn(DatabaseWriter::write)
+            .build();
 
     // Map Stage, can be used instead of sink
     // StreamStage<String> log = uc1Topology.map(s -> {
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
index 7fbcc1cfe..ce95d1a8c 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
@@ -11,7 +11,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
 
 
 /**
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
similarity index 94%
rename from theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java
rename to theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
index 5c22b8dd6..8d4793dc0 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSerializer.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics;
+package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 
 import com.google.common.math.Stats;
 import com.google.common.math.StatsAccumulator;
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
similarity index 88%
rename from theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java
rename to theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
index f4d203f03..457365799 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/uc2specifics/StatsAccumulatorSupplier.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
@@ -1,4 +1,4 @@
-package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics;
+package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 
 import com.google.common.math.StatsAccumulator;
 import com.hazelcast.function.SupplierEx;
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java
new file mode 100644
index 000000000..dcfa56177
--- /dev/null
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java
@@ -0,0 +1,48 @@
+package rocks.theodolite.benchmarks.uc2.hazelcastjet;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import com.hazelcast.jet.aggregate.AggregateOperation;
+import com.hazelcast.jet.aggregate.AggregateOperation1;
+import java.util.Map.Entry;
+import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
+
+
+/**
+ * Factory for creating an aggregation operator for {@link Stats} objects.
+ */
+public final class StatsAggregatorFactory {
+
+  /**
+   * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
+   * Jet implementation of UC2.
+   *
+   * <p>
+   * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
+   * {@link Stats} object.
+   * </p>
+   *
+   * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
+   *         ActivePowerRecord Objects into Stats Objects.
+   */
+  public static AggregateOperation1<Entry<String, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS
+      create() {
+    // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
+    // the StatsAccumulator.
+    return AggregateOperation
+        // Creates the accumulator
+        .withCreate(new StatsAccumulatorSupplier())
+        // Defines the accumulation
+        .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
+          accumulator.add(item.getValue().getValueInW());
+        })
+        // Defines the combination of spread out instances
+        .andCombine((left, right) -> {
+          final Stats rightStats = right.snapshot();
+          left.addAll(rightStats);
+
+        })
+        // Finishes the aggregation
+        .andExportFinish(StatsAccumulator::snapshot);
+  }
+}
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
index 02e0ceb07..52096b860 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
@@ -1,9 +1,5 @@
 package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 
-import com.google.common.math.Stats;
-import com.google.common.math.StatsAccumulator;
-import com.hazelcast.jet.aggregate.AggregateOperation;
-import com.hazelcast.jet.aggregate.AggregateOperation1;
 import com.hazelcast.jet.kafka.KafkaSinks;
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
@@ -13,16 +9,13 @@ import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.WindowDefinition;
 import java.time.Duration;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
 import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
 
 
 /**
- * PipelineFactory for use case 2.
- * Allows to build and extend a pipeline.
+ * PipelineFactory for use case 2. Allows to build and extend a pipeline.
  */
 public class Uc2PipelineFactory extends PipelineFactory {
 
@@ -30,6 +23,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
 
   /**
    * Factory for uc2 pipelines.
+   *
    * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
    *        attributes.
    * @param kafkaInputTopic The name of the input topic used for the pipeline.
@@ -40,12 +34,12 @@ public class Uc2PipelineFactory extends PipelineFactory {
    *        of this pipeline.
    */
   protected Uc2PipelineFactory(final Properties kafkaReadPropsForPipeline,
-                               final String kafkaInputTopic,
-                               final Properties kafkaWritePropsForPipeline,
-                               final String kafkaOutputTopic,
-                               final Duration downsampleIntervalInMs) {
+      final String kafkaInputTopic,
+      final Properties kafkaWritePropsForPipeline,
+      final String kafkaOutputTopic,
+      final Duration downsampleIntervalInMs) {
     super(kafkaReadPropsForPipeline, kafkaInputTopic,
-        kafkaWritePropsForPipeline,kafkaOutputTopic);
+        kafkaWritePropsForPipeline, kafkaOutputTopic);
     this.downsampleInterval = downsampleIntervalInMs;
   }
 
@@ -60,17 +54,18 @@ public class Uc2PipelineFactory extends PipelineFactory {
 
     // Define the Kafka Source
     final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
-        KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
+        KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline,
+            this.kafkaInputTopic);
 
     // Extend UC2 topology to the pipeline
     final StreamStage<Map.Entry<String, String>> uc2TopologyProduct =
         this.extendUc2Topology(kafkaSource);
 
     // Add Sink1: Logger
-    uc2TopologyProduct.writeTo(Sinks.logger());
+    uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations
     // Add Sink2: Write back to kafka for the final benchmark
     uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
-        kafkaWritePropsForPipeline, kafkaOutputTopic));
+        this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
 
     return this.pipe;
   }
@@ -90,15 +85,15 @@ public class Uc2PipelineFactory extends PipelineFactory {
    *         and value of the Entry object. It can be used to be further modified or directly be
    *         written into a sink.
    */
-  public StreamStage<Map.Entry<String, String>>
-        extendUc2Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+  public StreamStage<Map.Entry<String, String>> extendUc2Topology(
+      final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
     // Build the pipeline topology.
-    return pipe.readFrom(source)
+    return this.pipe.readFrom(source)
         .withNativeTimestamps(0)
         .setLocalParallelism(1)
         .groupingKey(record -> record.getValue().getIdentifier())
-        .window(WindowDefinition.tumbling(downsampleInterval.toMillis()))
-        .aggregate(this.uc2AggregateOperation())
+        .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis()))
+        .aggregate(StatsAggregatorFactory.create())
         .map(agg -> {
           final String theKey = agg.key();
           final String theValue = agg.getValue().toString();
@@ -106,37 +101,4 @@ public class Uc2PipelineFactory extends PipelineFactory {
         });
   }
 
-  /**
-   * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
-   * Jet implementation of UC2.
-   *
-   * <p>
-   * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
-   * {@link Stats} object.
-   * </p>
-   *
-   * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
-   *         ActivePowerRecord Objects into Stats Objects.
-   */
-  public AggregateOperation1<Entry<String, ActivePowerRecord>,
-      StatsAccumulator, Stats> uc2AggregateOperation() { // NOCS
-    // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
-    // the Statsaccumulator.
-    return AggregateOperation
-        // Creates the accumulator
-        .withCreate(new StatsAccumulatorSupplier())
-        // Defines the accumulation
-        .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
-          accumulator.add(item.getValue().getValueInW());
-        })
-        // Defines the combination of spread out instances
-        .andCombine((left, right) -> {
-          final Stats rightStats = right.snapshot();
-          left.addAll(rightStats);
-
-        })
-        // Finishes the aggregation
-        .andExportFinish(
-            StatsAccumulator::snapshot);
-  }
 }
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
index 8461d9ceb..8b44bcd5f 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
@@ -5,14 +5,12 @@ import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JetConfig;
 import com.hazelcast.jet.core.JetTestSupport;
 import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
 import com.hazelcast.jet.pipeline.test.Assertions;
 import com.hazelcast.jet.pipeline.test.TestSources;
 import com.hazelcast.jet.test.SerialTest;
-
 import java.time.Duration;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -67,7 +65,7 @@ public class Uc2PipelineTest extends JetTestSupport {
     // Create pipeline to test
     final Properties properties = new Properties();
     final Uc2PipelineFactory factory = new Uc2PipelineFactory(
-        properties,"",properties,"", testWindow);
+        properties, "", properties, "", testWindow);
 
     this.uc2Topology = factory.extendUc2Topology(testSource);
     this.testPipeline = factory.getPipe();
-- 
GitLab