From 81fd7eac964104bac43fede3528b1deb399c1f30 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Mon, 28 Feb 2022 16:48:09 +0100
Subject: [PATCH] Change output + Refactoring + Set id

new output: AggregatedPowerRecords
refactoring: remove inferable types
set id in combine function of aggregator
---
 .../uc4/application/Uc4PipelineBuilder.java   | 31 ++++++-------------
 ...ggregatedActivePowerRecordAccumulator.java |  3 +-
 2 files changed, 11 insertions(+), 23 deletions(-)

diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
index a1fcb8bbc..2af2e111a 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java
@@ -83,16 +83,14 @@ public class Uc4PipelineBuilder {
     final Pipeline uc4Pipeline = Pipeline.create();
 
     // Sources for this use case
-    final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka(
-        kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
+    final StreamSource<Entry<Event, String>> configSource =
+        KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
 
     final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
-        KafkaSources.<String, ActivePowerRecord>kafka(
-            kafkaInputReadPropsForPipeline, kafkaInputTopic);
+        KafkaSources.kafka(kafkaInputReadPropsForPipeline, kafkaInputTopic);
 
     final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource =
-        KafkaSources.<String, AggregatedActivePowerRecord>
-            kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
+        KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
 
     // Extend UC4 topology to pipeline
     final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
@@ -106,22 +104,10 @@ public class Uc4PipelineBuilder {
     // Log aggregation product
     uc4Aggregation.writeTo(Sinks.logger());
 
-    // Map Aggregated to ActivePowerRecord
-    final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = uc4Aggregation
-        .map(entry -> {
-          final AggregatedActivePowerRecord agg = entry.getValue();
-          final ActivePowerRecord record = new ActivePowerRecord(
-              agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
-          return Util.entry(entry.getKey(), record);
-        });
-
     // Add Sink2: Write back to kafka output topic
-    uc4Product.writeTo(KafkaSinks.kafka(
+    uc4Aggregation.writeTo(KafkaSinks.kafka(
         kafkaWritePropsForPipeline, kafkaOutputTopic));
 
-    // Logger uc4 product
-    uc4Product.writeTo(Sinks.logger());
-
     // Return the pipeline
     return uc4Pipeline;
   }
@@ -261,8 +247,9 @@ public class Uc4PipelineBuilder {
           acc.setId(rec.getKey().getGroup());
           acc.addInputs(rec.getValue());
         })
-//      .andCombine((acc, rec)-> new AggregatedActivePowerRecordAccumulator()) // NOCS
-//      .andDeduct((acc, rec) -> new AggregatedActivePowerRecordAccumulator()) // NOCS
+        .andCombine((acc, acc2) ->
+            acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp()))
+        .andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount()))
         .andExportFinish(acc ->
             new AggregatedActivePowerRecord(acc.getId(),
                 acc.getTimestamp(),
@@ -271,8 +258,8 @@ public class Uc4PipelineBuilder {
                 acc.getAverageInW())
         );
 
-
     // write aggregation back to kafka
+
     return windowedLastValues
         .groupingKey(entry -> entry.getKey().getGroup())
         .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java
index 90a2f2844..a33b5cfea 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/AggregatedActivePowerRecordAccumulator.java
@@ -56,7 +56,8 @@ public class AggregatedActivePowerRecordAccumulator {
   /**
    * Adds the records from another aggregator.
    */
-  public void addInputs(final double sumInW, final long count, final long timestamp) {
+  public void addInputs(final String id, final double sumInW, final long count, final long timestamp) {
+    this.id = this.id == null ? id : this.id;
     this.sumInW += sumInW;
     this.count += count;
     this.timestamp = Math.max(this.timestamp, timestamp);
-- 
GitLab