From 4702e63db2efedc3e0b197d9cfdf115f73152bc4 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Mon, 21 Feb 2022 17:19:30 +0100
Subject: [PATCH] Change hazelcast to output correct types + correct small
 spelling errors

---
 .../uc4/application/Uc4PipelineBuilder.java   | 92 +++++++++++++------
 ...ggregatedActivePowerRecordAccumulator.java | 75 +++++++++++++++
 .../uc4/application/Uc4PipelineTest.java      | 23 +++--
 3 files changed, 154 insertions(+), 36 deletions(-)
 create mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java

diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
index 05ec199d1..a1fcb8bbc 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
@@ -4,7 +4,8 @@ import com.hazelcast.function.BiFunctionEx;
 import com.hazelcast.jet.Traverser;
 import com.hazelcast.jet.Traversers;
 import com.hazelcast.jet.Util;
-import com.hazelcast.jet.aggregate.AggregateOperations;
+import com.hazelcast.jet.aggregate.AggregateOperation;
+import com.hazelcast.jet.aggregate.AggregateOperation1;
 import com.hazelcast.jet.kafka.KafkaSinks;
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
@@ -24,11 +25,13 @@ import java.util.Properties;
 import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.uc4.application.uc4specifics.AggregatedActivePowerRecordAccumulator;
 import theodolite.uc4.application.uc4specifics.ChildParentsTransformer;
 import theodolite.uc4.application.uc4specifics.SensorGroupKey;
 import theodolite.uc4.application.uc4specifics.ValueGroup;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
 import titan.ccp.model.sensorregistry.SensorRegistry;
 
 /**
@@ -76,33 +79,47 @@ public class Uc4PipelineBuilder {
       LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline);
     }
 
-
-
     // The pipeline for this Use Case
     final Pipeline uc4Pipeline = Pipeline.create();
 
     // Sources for this use case
     final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka(
         kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
+
     final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
         KafkaSources.<String, ActivePowerRecord>kafka(
             kafkaInputReadPropsForPipeline, kafkaInputTopic);
-    final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource =
-        KafkaSources.<String, ActivePowerRecord>
+
+    final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource =
+        KafkaSources.<String, AggregatedActivePowerRecord>
             kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
 
     // Extend UC4 topology to pipeline
-    final StreamStage<Entry<String, ActivePowerRecord>> uc4Product =
+    final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
         this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource,
             windowSize);
 
-    // Add Sink1: Write back to kafka output topic
-    uc4Product.writeTo(KafkaSinks.kafka(
-        kafkaWritePropsForPipeline, kafkaOutputTopic));
     // Add Sink2: Write back to kafka feedback/aggregation topic
-    uc4Product.writeTo(KafkaSinks.kafka(
+    uc4Aggregation.writeTo(KafkaSinks.kafka(
         kafkaWritePropsForPipeline, kafkaFeedbackTopic));
-    // Add Sink3: Logger
+
+    // Log aggregation product
+    uc4Aggregation.writeTo(Sinks.logger());
+
+    // Map Aggregated to ActivePowerRecord
+    final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = uc4Aggregation
+        .map(entry -> {
+          final AggregatedActivePowerRecord agg = entry.getValue();
+          final ActivePowerRecord record = new ActivePowerRecord(
+              agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
+          return Util.entry(entry.getKey(), record);
+        });
+
+    // Add Sink2: Write back to kafka output topic
+    uc4Product.writeTo(KafkaSinks.kafka(
+        kafkaWritePropsForPipeline, kafkaOutputTopic));
+
+    // Logger uc4 product
     uc4Product.writeTo(Sinks.logger());
 
     // Return the pipeline
@@ -114,7 +131,7 @@ public class Uc4PipelineBuilder {
    *
    * <p>
    * UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps
-   * from keys to groups to map values to their accourding groups. A feedback stream allows for
+   * from keys to groups to map values to their according groups. A feedback stream allows for
    * group keys to be mapped to values and eventually to be mapped to other top level groups defines
    * by the {@code SensorRegistry}.
    * </p>
@@ -125,7 +142,7 @@ public class Uc4PipelineBuilder {
    * (2) Merge Input Values and Aggregations <br>
    * (3) Join Configuration with Merged Input Stream <br>
    * (4) Duplicate as flatmap per value and group <br>
-   * (5) Window (preperation for possible last values) <br>
+   * (5) Window (preparation for possible last values) <br>
    * (6) Aggregate data over the window
    * </p>
    *
@@ -138,10 +155,10 @@ public class Uc4PipelineBuilder {
    *         according aggregated values. The data can be further modified or directly be linked to
    *         a Hazelcast Jet sink.
    */
-  public StreamStage<Entry<String, ActivePowerRecord>> extendUc4Topology(// NOPMD
+  public StreamStage<Entry<String, AggregatedActivePowerRecord>> extendUc4Topology(// NOPMD
       final Pipeline pipe,
       final StreamSource<Entry<String, ActivePowerRecord>> inputSource,
-      final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource,
+      final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource,
       final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) {
 
     //////////////////////////////////
@@ -167,7 +184,13 @@ public class Uc4PipelineBuilder {
     // (1) Aggregation Stream
     final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe
         .readFrom(aggregationSource)
-        .withNativeTimestamps(0);
+        .withNativeTimestamps(0)
+        .map(entry -> { // Map Aggregated to ActivePowerRecord
+          final AggregatedActivePowerRecord agg = entry.getValue();
+          final ActivePowerRecord record = new ActivePowerRecord(
+              agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
+          return Util.entry(entry.getKey(), record);
+        });
 
     //////////////////////////////////
     // (2) UC4 Merge Input with aggregation stream
@@ -230,24 +253,33 @@ public class Uc4PipelineBuilder {
         windowedLastValues = dupliAsFlatmappedStage
         .window(WindowDefinition.tumbling(windowSize));
 
-    //////////////////////////////////
-    // (6) UC4 GroupBy and aggregate and map
-    // Group using the group out of the sensorGroupKey keys
+    final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>,
+        AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp =
+        AggregateOperation
+        .withCreate(AggregatedActivePowerRecordAccumulator::new)
+        .<Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> {
+          acc.setId(rec.getKey().getGroup());
+          acc.addInputs(rec.getValue());
+        })
+//      .andCombine((acc, rec)-> new AggregatedActivePowerRecordAccumulator()) // NOCS
+//      .andDeduct((acc, rec) -> new AggregatedActivePowerRecordAccumulator()) // NOCS
+        .andExportFinish(acc ->
+            new AggregatedActivePowerRecord(acc.getId(),
+                acc.getTimestamp(),
+                acc.getCount(),
+                acc.getSumInW(),
+                acc.getAverageInW())
+        );
+
+
+    // write aggregation back to kafka
     return windowedLastValues
         .groupingKey(entry -> entry.getKey().getGroup())
-        .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue().getValueInW()))
-        .map(agg -> {
-          // Construct data for return pair
-          final String theGroup = agg.getKey();
-          final Double summedValueInW = agg.getValue();
-
-          // Return aggregates group value pair
-          return Util.entry(
-              theGroup,
-              new ActivePowerRecord(theGroup, System.currentTimeMillis(), summedValueInW));
-        });
+        .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
   }
 
+
+
   /**
    * FlatMap function used to process the configuration input for UC4.
    */
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java
new file mode 100644
index 000000000..7ccda5474
--- /dev/null
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java
@@ -0,0 +1,75 @@
+package theodolite.uc4.application.uc4specifics;
+
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Accumulator class for AggregatedActivePowerRecords.
+ */
+public class AggregatedActivePowerRecordAccumulator {
+
+  private String id;
+  private long timestamp;
+  private long count;
+  private double sumInW;
+  private double averageInW;
+
+  /**
+   * Default constructor.
+   */
+  public AggregatedActivePowerRecordAccumulator() {
+    // This constructor is intentionally empty. Nothing special is needed here.
+  }
+
+
+  /**
+   * Creates an AggregationObject.
+   */
+  public AggregatedActivePowerRecordAccumulator(final String id,
+                                                final long timestamp,
+                                                final long count,
+                                                final double sumInW,
+                                                final double averageInW) {
+    this.id = id;
+    this.timestamp = timestamp;
+    this.count = count;
+    this.sumInW = sumInW;
+    this.averageInW = averageInW;
+  }
+
+  /**
+   * Sets the id.
+   */
+  public void setId(final String id) {
+    this.id = id;
+  }
+
+  /**
+   * Adds the record to the aggregation.
+   */
+  public void addInputs(final ActivePowerRecord record) {
+    this.count += 1;
+    this.sumInW += record.getValueInW();
+    this.timestamp = record.getTimestamp();
+    this.averageInW = sumInW / count;
+  }
+
+  public long getCount() {
+    return count;
+  }
+
+  public double getSumInW() {
+    return sumInW;
+  }
+
+  public double getAverageInW() {
+    return averageInW;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java
index 263b3c1a5..d505b7169 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java
@@ -27,6 +27,7 @@ import theodolite.uc4.application.uc4specifics.ValueGroup;
 import theodolite.uc4.application.uc4specifics.ValueGroupSerializer;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
 import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
 import titan.ccp.model.sensorregistry.MachineSensor;
 import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
@@ -38,7 +39,7 @@ public class Uc4PipelineTest extends JetTestSupport {
   // TEst Machinery
   JetInstance testInstance = null;
   Pipeline testPipeline = null;
-  StreamStage<Entry<String, ActivePowerRecord>> uc4Topology = null;
+  StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Topology = null;
 
   @Before
   public void buildUc4Pipeline() {
@@ -67,12 +68,22 @@ public class Uc4PipelineTest extends JetTestSupport {
           return testEntry;
         });
 
+    final AggregatedActivePowerRecord.Builder aggregationBuilder = AggregatedActivePowerRecord.newBuilder();
+
     // Create test source 2 : Mock aggregation Values
-    final StreamSource<Entry<String, ActivePowerRecord>> testAggregationSource =
+    final StreamSource<Entry<String, AggregatedActivePowerRecord>> testAggregationSource =
         TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> {
+
+          AggregatedActivePowerRecord test =
+              new AggregatedActivePowerRecord(testSensorName,
+                  System.currentTimeMillis(),
+                  1L,
+                  testValueInW,
+                  testValueInW);
+
           final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW);
-          final Entry<String, ActivePowerRecord> testEntry =
-              Map.entry(testLevel1GroupName, testAggValue);
+          final Entry<String, AggregatedActivePowerRecord> testEntry =
+              Map.entry(testLevel1GroupName, test);
           return testEntry;
         });
 
@@ -114,7 +125,7 @@ public class Uc4PipelineTest extends JetTestSupport {
   @Test
   public void testOutput() {
 
-    System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1");
+//    System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1");
     
     // Assertion Configuration
     int timeout = 10;
@@ -127,7 +138,7 @@ public class Uc4PipelineTest extends JetTestSupport {
     this.uc4Topology.apply(Assertions.assertCollectedEventually(timeout, 
         collection -> {
           System.out.println("DEBUG DEBUG DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY");
-          Thread.sleep(2000);
+          Thread.sleep(20_000);
           
           boolean allOkay = true;
           
-- 
GitLab