From 9a64d3bdf74cab142aee51e3e6ba19f11406e98e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 25 Nov 2022 19:53:04 +0100
Subject: [PATCH] Align Hazelcast UC3 implementation with others

---
 .../benchmarks/uc2/beam/StatsAggregation.java |  2 +-
 .../StatsAccumulatorSerializer.java           |  3 +-
 .../StatsAccumulatorSupplier.java             |  5 +-
 .../hazelcastjet/StatsAggregatorFactory.java  |  6 +--
 .../benchmarks/uc3/beam/StatsAggregation.java |  2 +-
 .../uc3/hazelcastjet/HistoryService.java      |  3 +-
 .../StatsAccumulatorSerializer.java           | 37 ++++++++++++++
 .../StatsAccumulatorSupplier.java             | 21 ++++++++
 .../hazelcastjet/StatsAggregatorFactory.java  | 50 +++++++++++++++++++
 .../uc3/hazelcastjet/Uc3PipelineFactory.java  |  8 ++-
 .../uc3/hazelcastjet/Uc3PipelineTest.java     |  9 ++--
 11 files changed, 126 insertions(+), 20 deletions(-)
 create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSerializer.java
 create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSupplier.java
 create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAggregatorFactory.java

diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/StatsAggregation.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/StatsAggregation.java
index cf320bf18..491b8b05b 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/StatsAggregation.java
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/StatsAggregation.java
@@ -31,7 +31,7 @@ public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumula
 
   @Override
   public StatsAccumulator mergeAccumulators(final Iterable<StatsAccumulator> accums) {
-    final StatsAccumulator merged = createAccumulator();
+    final StatsAccumulator merged = this.createAccumulator();
     for (final StatsAccumulator accum : accums) {
       merged.addAll(accum.snapshot());
     }
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
index 8d4793dc0..d2fec1b13 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSerializer.java
@@ -8,8 +8,7 @@ import com.hazelcast.nio.serialization.StreamSerializer;
 import java.io.IOException;
 
 /**
- * A serializer and deserializer for the StatsAccumulator which is used in the UC2 implementation
- * using Hazelcast Jet.
+ * A serializer and deserializer for the {@link StatsAccumulator}.
  */
 public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> {
 
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
index 457365799..401154249 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAccumulatorSupplier.java
@@ -4,15 +4,14 @@ import com.google.common.math.StatsAccumulator;
 import com.hazelcast.function.SupplierEx;
 
 /**
- * Supplies a StatsAccumulator. Is used in the aggregation operation of the Hazelcast Jet
- * implementation for UC2.
+ * Supplies a {@link StatsAccumulator}.
  */
 public class StatsAccumulatorSupplier implements SupplierEx<StatsAccumulator> {
 
   private static final long serialVersionUID = -656395626316842910L; // NOPMD
 
   /**
-   * Gets a StatsAccumulator.
+   * Gets a {@link StatsAccumulator}.
    */
   @Override
   public StatsAccumulator getEx() throws Exception {
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java
index c40846052..dd52b12be 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/StatsAggregatorFactory.java
@@ -27,15 +27,15 @@ public final class StatsAggregatorFactory {
    * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
    *         ActivePowerRecord Objects into Stats Objects.
    */
-  public static AggregateOperation1<Entry<String, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS
+  public static AggregateOperation1<Entry<?, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS
       create() {
-    // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
+    // Aggregate Operation to Create a Stats Object from Entry<?,ActivePowerRecord> items using
     // the StatsAccumulator.
     return AggregateOperation
         // Creates the accumulator
         .withCreate(new StatsAccumulatorSupplier())
         // Defines the accumulation
-        .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
+        .<Entry<?, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
           accumulator.add(item.getValue().getValueInW());
         })
         // Defines the combination of spread out instances
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/StatsAggregation.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/StatsAggregation.java
index 4fca536ba..0c3822fc5 100644
--- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/StatsAggregation.java
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/StatsAggregation.java
@@ -8,7 +8,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
 
-
 /**
  * Aggregation Class for ActivePowerRecords. Creates a StatsAccumulator based on the ValueInW.
  */
@@ -16,6 +15,7 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
 @DefaultCoder(AvroCoder.class)
 public class StatsAggregation extends CombineFn<ActivePowerRecord, StatsAccumulator, Stats>
     implements Serializable {
+
   private static final long serialVersionUID = 1L;
 
   @Override
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
index ff93b8660..df3f85a42 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java
@@ -1,5 +1,6 @@
 package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
+import com.google.common.math.StatsAccumulator;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.time.Duration;
 import java.util.Properties;
@@ -58,9 +59,9 @@ public class HistoryService extends HazelcastJetService {
   @Override
   protected void registerSerializer() {
     this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class);
+    this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
   }
 
-
   public static void main(final String[] args) {
     new HistoryService().run();
   }
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSerializer.java
new file mode 100644
index 000000000..c6957ac1e
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSerializer.java
@@ -0,0 +1,37 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import java.io.IOException;
+
+/**
+ * A serializer and deserializer for the {@link StatsAccumulator}.
+ */
+public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> {
+
+  private static final int TYPE_ID = 69_420;
+
+  @Override
+  public int getTypeId() {
+    return TYPE_ID;
+  }
+
+  @Override
+  public void write(final ObjectDataOutput out, final StatsAccumulator object) throws IOException {
+    final byte[] byteArray = object.snapshot().toByteArray();
+    out.writeByteArray(byteArray);
+  }
+
+  @Override
+  public StatsAccumulator read(final ObjectDataInput in) throws IOException {
+    final byte[] byteArray = in.readByteArray();
+    final Stats deserializedStats = Stats.fromByteArray(byteArray);
+    final StatsAccumulator accumulator = new StatsAccumulator();
+    accumulator.addAll(deserializedStats);
+    return accumulator;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSupplier.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSupplier.java
new file mode 100644
index 000000000..8143fb6a0
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAccumulatorSupplier.java
@@ -0,0 +1,21 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import com.google.common.math.StatsAccumulator;
+import com.hazelcast.function.SupplierEx;
+
+/**
+ * Supplies a {@link StatsAccumulator}.
+ */
+public class StatsAccumulatorSupplier implements SupplierEx<StatsAccumulator> {
+
+  private static final long serialVersionUID = -656395626316842910L; // NOPMD
+
+  /**
+   * Gets a {@link StatsAccumulator}.
+   */
+  @Override
+  public StatsAccumulator getEx() throws Exception {
+    return new StatsAccumulator();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAggregatorFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAggregatorFactory.java
new file mode 100644
index 000000000..1588668a9
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/StatsAggregatorFactory.java
@@ -0,0 +1,50 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import com.hazelcast.jet.aggregate.AggregateOperation;
+import com.hazelcast.jet.aggregate.AggregateOperation1;
+import java.util.Map.Entry;
+import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
+
+
+/**
+ * Factory for creating an aggregation operator for {@link Stats} objects.
+ */
+public final class StatsAggregatorFactory {
+
+  private StatsAggregatorFactory() {}
+
+  /**
+   * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
+   * Jet implementation of UC2.
+   *
+   * <p>
+   * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
+   * {@link Stats} object.
+   * </p>
+   *
+   * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
+   *         ActivePowerRecord Objects into Stats Objects.
+   */
+  public static AggregateOperation1<Entry<?, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS
+      create() {
+    // Aggregate Operation to Create a Stats Object from Entry<?,ActivePowerRecord> items using
+    // the StatsAccumulator.
+    return AggregateOperation
+        // Creates the accumulator
+        .withCreate(new StatsAccumulatorSupplier())
+        // Defines the accumulation
+        .<Entry<?, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
+          accumulator.add(item.getValue().getValueInW());
+        })
+        // Defines the combination of spread out instances
+        .andCombine((left, right) -> {
+          final Stats rightStats = right.snapshot();
+          left.addAll(rightStats);
+
+        })
+        // Finishes the aggregation
+        .andExportFinish(StatsAccumulator::snapshot);
+  }
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
index 1f3cd7e77..e3dcb037d 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
@@ -1,6 +1,5 @@
 package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
-import com.hazelcast.jet.aggregate.AggregateOperations;
 import com.hazelcast.jet.kafka.KafkaSinks;
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
@@ -130,13 +129,12 @@ public class Uc3PipelineFactory extends PipelineFactory {
         .window(WindowDefinition
             .sliding(this.windowSize.toMillis(), this.hoppingSize.toMillis())
             .setEarlyResultsPeriod(this.emitPeriod.toMillis()))
-        // get average value of group (sensoreId,hourOfDay)
-        .aggregate(
-            AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
+        // get aggregated values for (sensoreId, hourOfDay)
+        .aggregate(StatsAggregatorFactory.create())
         // map to return pair sensorID -> stats
         .map(agg -> {
           final String sensorId = agg.getKey().getSensorId();
-          final String stats = agg.getValue().toString(); // TODO just double, not stats
+          final String stats = agg.getValue().toString();
           return Map.entry(sensorId, stats);
         });
   }
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
index aa74ae146..8f17e981e 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
@@ -1,5 +1,6 @@
 package rocks.theodolite.benchmarks.uc3.hazelcastjet;
 
+import com.google.common.math.Stats;
 import com.hazelcast.jet.Jet;
 import com.hazelcast.jet.JetInstance;
 import com.hazelcast.jet.config.JetConfig;
@@ -40,7 +41,8 @@ public class Uc3PipelineTest extends JetTestSupport {
   private static final Double TEST_VALUE_IN_W = 10.0;
   private static final Duration TEST_WINDOW_SLIDE = Duration.ofSeconds(1);
   private static final Duration TEST_WINDOW_SIZE = Duration.ofSeconds(50);
-  private static final Duration TEST_EMIT_PERIOD = Duration.ofSeconds(0); // Do not emit early results
+  private static final Duration TEST_EMIT_PERIOD = Duration.ofSeconds(0); // Do not emit early
+                                                                          // results
   // Used to check hourOfDay
   private static final long MOCK_TIMESTAMP = 1632741651;
 
@@ -111,7 +113,7 @@ public class Uc3PipelineTest extends JetTestSupport {
             for (final Entry<String, String> entry : collection) {
               // Compare expected output with generated output
               final String expectedKey = TEST_SENSOR_NAME;
-              final String expectedValue = Double.toString(TEST_VALUE_IN_W);
+              final String expectedValue = Stats.of(TEST_VALUE_IN_W).toString();
 
               // DEBUG
               LOGGER.info(
@@ -119,8 +121,7 @@ public class Uc3PipelineTest extends JetTestSupport {
                       + "' - Actual Output: '" + entry.getKey() + "="
                       + entry.getValue() + "'");
 
-              if (!(entry.getKey().equals(expectedKey)
-                  && entry.getValue().equals(expectedValue))) {
+              if (!(entry.getKey().equals(expectedKey) && entry.getValue().equals(expectedValue))) {
                 LOGGER.info("CHECK 5 || Failed assertion!");
                 allOkay = false;
               }
-- 
GitLab