From c80df733a92cff9a154bb6a5e01fd002e0d412d8 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Wed, 25 May 2022 16:52:52 +0200
Subject: [PATCH] Add uc2,3,4 PipelineFactories

---
 .../uc2/hazelcastjet/Uc2PipelineFactory.java  | 134 ++++++++
 .../uc3/hazelcastjet/Uc3PipelineFactory.java  | 131 ++++++++
 .../uc4/hazelcastjet/Uc4PipelineFactory.java  | 316 ++++++++++++++++++
 3 files changed, 581 insertions(+)
 create mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
 create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
 create mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java

diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
new file mode 100644
index 000000000..394862700
--- /dev/null
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
@@ -0,0 +1,134 @@
+package rocks.theodolite.benchmarks.uc2.hazelcastjet;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import com.hazelcast.jet.aggregate.AggregateOperation;
+import com.hazelcast.jet.aggregate.AggregateOperation1;
+import com.hazelcast.jet.kafka.KafkaSinks;
+import com.hazelcast.jet.kafka.KafkaSources;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.StreamSource;
+import com.hazelcast.jet.pipeline.StreamStage;
+import com.hazelcast.jet.pipeline.WindowDefinition;
+import java.util.Map;
+import java.util.Properties;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
+import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class Uc2PipelineFactory extends PipelineFactory {
+
+  private final int downsampleIntervalInMs;
+
+  /**
+   * Factory for uc2 pipelines.
+   * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
+   *        attributes.
+   * @param kafkaInputTopic The name of the input topic used for the pipeline.
+   * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
+   *        attributes.
+   * @param kafkaOutputTopic The name of the output topic used for the pipeline.
+   * @param downsampleIntervalInMs The window length of the tumbling window used in the aggregation
+   *        of this pipeline.
+   */
+  protected Uc2PipelineFactory(final Properties kafkaReadPropsForPipeline,
+                               final String kafkaInputTopic,
+                               final Properties kafkaWritePropsForPipeline,
+                               final String kafkaOutputTopic,
+                               final int downsampleIntervalInMs) {
+    super(kafkaReadPropsForPipeline, kafkaInputTopic,
+        kafkaWritePropsForPipeline,kafkaOutputTopic);
+    this.downsampleIntervalInMs = downsampleIntervalInMs;
+  }
+
+  /**
+   * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
+   *
+   * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
+   *         for UC2.
+   */
+  public Pipeline buildPipeline() {
+
+    // Define the Kafka Source
+    final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
+        KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
+
+    // Extend UC2 topology to the pipeline
+    final StreamStage<Map.Entry<String, String>> uc2TopologyProduct =
+        this.extendUc2Topology(kafkaSource);
+
+    // Add Sink1: Logger
+    uc2TopologyProduct.writeTo(Sinks.logger());
+    // Add Sink2: Write back to kafka for the final benchmark
+    uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
+        kafkaWritePropsForPipeline, kafkaOutputTopic));
+
+    return this.pipe;
+  }
+
+  /**
+   * Extends to a blank Hazelcast Jet Pipeline the UC2 topology defined by theodolite.
+   *
+   * <p>
+   * UC2 takes {@code ActivePowerRecord} objects, groups them by keys, windows them in a tumbling
+   * window and aggregates them into {@code Stats} objects. The final map returns an
+   * {@code Entry<String,String>} where the key is the key of the group and the String is the
+   * {@code .toString()} representation of the {@code Stats} object.
+   * </p>
+   *
+   * @param source A streaming source to fetch data from.
+   * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
+   *         and value of the Entry object. It can be used to be further modified or directly be
+   *         written into a sink.
+   */
+  public StreamStage<Map.Entry<String, String>>
+        extendUc2Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+    // Build the pipeline topology.
+    return pipe.readFrom(source)
+        .withNativeTimestamps(0)
+        .setLocalParallelism(1)
+        .groupingKey(record -> record.getValue().getIdentifier())
+        .window(WindowDefinition.tumbling(downsampleIntervalInMs))
+        .aggregate(this.uc2AggregateOperation())
+        .map(agg -> {
+          final String theKey = agg.key();
+          final String theValue = agg.getValue().toString();
+          return Map.entry(theKey, theValue);
+        });
+  }
+
+  /**
+   * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
+   * Jet implementation of UC2.
+   *
+   * <p>
+   * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
+   * {@link Stats} object.
+   * </p>
+   *
+   * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
+   *         ActivePowerRecord Objects into Stats Objects.
+   */
+  public AggregateOperation1<Map.Entry<String, ActivePowerRecord>,
+      StatsAccumulator, Stats> uc2AggregateOperation() {
+    // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
+    // the Statsaccumulator.
+    return AggregateOperation
+        // Creates the accumulator
+        .withCreate(new StatsAccumulatorSupplier())
+        // Defines the accumulation
+        .<Map.Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
+          accumulator.add(item.getValue().getValueInW());
+        })
+        // Defines the combination of spread out instances
+        .andCombine((left, right) -> {
+          final Stats rightStats = right.snapshot();
+          left.addAll(rightStats);
+
+        })
+        // Finishes the aggregation
+        .andExportFinish(
+            StatsAccumulator::snapshot);
+  }
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
new file mode 100644
index 000000000..09198393b
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
@@ -0,0 +1,131 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import com.hazelcast.jet.aggregate.AggregateOperations;
+import com.hazelcast.jet.kafka.KafkaSinks;
+import com.hazelcast.jet.kafka.KafkaSources;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.StreamSource;
+import com.hazelcast.jet.pipeline.StreamStage;
+import com.hazelcast.jet.pipeline.WindowDefinition;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
+import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
+import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
+import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
+import titan.ccp.model.records.ActivePowerRecord;
+
+
+public class Uc3PipelineFactory extends PipelineFactory {
+
+  private final int hoppingSizeInSeconds;
+  private final int windowSizeInSeconds;
+
+  /**
+   * Build a new Pipeline.
+   * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
+   *        attributes.
+   * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
+   *        attributes.
+   * @param kafkaInputTopic The name of the input topic used for the pipeline.
+   * @param kafkaOutputTopic The name of the output topic used for the pipeline.
+   * @param hoppingSizeInSeconds The hop length of the sliding window used in the aggregation of
+   *        this pipeline.
+   * @param windowSizeInSeconds The window length of the sliding window used in the aggregation of
+   *        this pipeline.
+   */
+  public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
+                            final String kafkaInputTopic,
+                            final Properties kafkaWritePropsForPipeline,
+                            final String kafkaOutputTopic,
+                            final int windowSizeInSeconds,
+                            final int hoppingSizeInSeconds) {
+    super(kafkaReadPropsForPipeline, kafkaInputTopic,
+        kafkaWritePropsForPipeline,kafkaOutputTopic);
+    this.windowSizeInSeconds = windowSizeInSeconds;
+    this.hoppingSizeInSeconds = hoppingSizeInSeconds;
+  }
+
+
+
+  /**
+   * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
+   * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data
+   *         for UC3.
+   */
+  public Pipeline buildPipeline() {
+
+    // Define the source
+    final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
+        .<String, ActivePowerRecord>kafka(
+            kafkaReadPropsForPipeline, kafkaInputTopic);
+
+    // Extend topology for UC3
+    final StreamStage<Map.Entry<String, String>> uc3Product =
+        this.extendUc3Topology(kafkaSource);
+
+    // Add Sink1: Logger
+    uc3Product.writeTo(Sinks.logger());
+    // Add Sink2: Write back to kafka for the final benchmark
+    uc3Product.writeTo(KafkaSinks.<String, String>kafka(
+        kafkaWritePropsForPipeline, kafkaOutputTopic));
+
+    return pipe;
+  }
+
+  /**
+   * Extends to a blank Hazelcast Jet Pipeline the UC3 topology defined by theodolite.
+   *
+   * <p>
+   * UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double
+   * values for a sliding window and sorts them into the hour of the day.
+   * </p>
+   *
+   * @param source A streaming source to fetch data from.
+   * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
+   *         and value of the Entry object. It can be used to be further modified or directly be
+   *         written into a sink.
+   */
+  public StreamStage<Map.Entry<String, String>>
+      extendUc3Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+
+    // Build the pipeline topology.
+    return pipe
+        .readFrom(source)
+        // use Timestamps
+        .withNativeTimestamps(0)
+        .setLocalParallelism(1)
+        // Map timestamp to hour of day and create new key using sensorID and
+        // datetime mapped to HourOfDay
+        .map(record -> {
+          final String sensorId = record.getValue().getIdentifier();
+          final long timestamp = record.getValue().getTimestamp();
+          final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
+              TimeZone.getDefault().toZoneId());
+
+          final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
+          final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
+
+          return Map.entry(newKey, record.getValue());
+        })
+        // group by new keys
+        .groupingKey(Map.Entry::getKey)
+        // Sliding/Hopping Window
+        .window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInSeconds),
+            TimeUnit.DAYS.toMillis(hoppingSizeInSeconds)))
+        // get average value of group (sensoreId,hourOfDay)
+        .aggregate(
+            AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
+        // map to return pair (sensorID,hourOfDay) -> (averaged what value)
+        .map(agg -> {
+          final String theValue = agg.getValue().toString();
+          final String theKey = agg.getKey().toString();
+          return Map.entry(theKey, theValue);
+        });
+  }
+}
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
new file mode 100644
index 000000000..be617085a
--- /dev/null
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java
@@ -0,0 +1,316 @@
+package rocks.theodolite.benchmarks.uc4.hazelcastjet;
+
+import com.hazelcast.function.BiFunctionEx;
+import com.hazelcast.jet.Traverser;
+import com.hazelcast.jet.Traversers;
+import com.hazelcast.jet.Util;
+import com.hazelcast.jet.aggregate.AggregateOperation;
+import com.hazelcast.jet.aggregate.AggregateOperation1;
+import com.hazelcast.jet.kafka.KafkaSinks;
+import com.hazelcast.jet.kafka.KafkaSources;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.StageWithWindow;
+import com.hazelcast.jet.pipeline.StreamSource;
+import com.hazelcast.jet.pipeline.StreamStage;
+import com.hazelcast.jet.pipeline.StreamStageWithKey;
+import com.hazelcast.jet.pipeline.WindowDefinition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.AggregatedActivePowerRecordAccumulator;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ChildParentsTransformer;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+import titan.ccp.model.sensorregistry.SensorRegistry;
+
+
+public class Uc4PipelineFactory extends PipelineFactory {
+
+  private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap";
+
+  private final Properties kafkaConfigPropsForPipeline;
+  private final Properties kafkaFeedbackPropsForPipeline;
+
+  private final String kafkaConfigurationTopic;
+  private final String kafkaFeedbackTopic;
+
+  private final int windowSize;
+
+
+  /**
+   * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
+   *
+   * @param kafkaInputReadPropsForPipeline Properties Object containing the necessary kafka input
+   *        read attributes.
+   * @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config read
+   *        attributes.
+   * @param kafkaFeedbackPropsForPipeline Properties Object containing the necessary kafka
+   *        aggregation read attributes.
+   * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
+   *        attributes.
+   * @param kafkaInputTopic The name of the input topic used for the pipeline.
+   * @param kafkaOutputTopic The name of the output topic used for the pipeline.
+   * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline.
+   * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline.
+   * @param windowSize The window size of the tumbling window used in this pipeline.
+   */
+  public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD
+                            final Properties kafkaConfigPropsForPipeline,
+                            final Properties kafkaFeedbackPropsForPipeline,
+                            final Properties kafkaWritePropsForPipeline,
+                            final String kafkaInputTopic,
+                            final String kafkaOutputTopic,
+                            final String kafkaConfigurationTopic,
+                            final String kafkaFeedbackTopic,
+                            final int windowSize) {
+
+    super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
+        kafkaWritePropsForPipeline,kafkaOutputTopic);
+    this.kafkaConfigPropsForPipeline = kafkaConfigPropsForPipeline;
+    this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline;
+    this.kafkaConfigurationTopic = kafkaConfigurationTopic;
+    this.kafkaFeedbackTopic = kafkaFeedbackTopic;
+    this.windowSize = windowSize;
+  }
+
+  /**
+   * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
+   * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data
+   *         for UC4.
+   */
+  public Pipeline buildPipeline() {
+
+    // Sources for this use case
+    final StreamSource<Map.Entry<Event, String>> configSource =
+        KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
+
+    final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource =
+        KafkaSources.kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
+
+    final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>> aggregationSource =
+        KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
+
+    // Extend UC4 topology to pipeline
+    final StreamStage<Map.Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
+        this.extendUc4Topology(pipe, inputSource, aggregationSource, configSource);
+
+    // Add Sink2: Write back to kafka feedback/aggregation topic
+    uc4Aggregation.writeTo(KafkaSinks.kafka(
+        kafkaWritePropsForPipeline, kafkaFeedbackTopic));
+
+    // Log aggregation product
+    uc4Aggregation.writeTo(Sinks.logger());
+
+    // Add Sink2: Write back to kafka output topic
+    uc4Aggregation.writeTo(KafkaSinks.kafka(
+        kafkaWritePropsForPipeline, kafkaOutputTopic));
+
+    // Return the pipeline
+    return pipe;
+  }
+
+
+  /**
+   * Extends to a blank Hazelcast Jet Pipeline the UC4 topology defines by theodolite.
+   *
+   * <p>
+   * UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps
+   * from keys to groups to map values to their according groups. A feedback stream allows for
+   * group keys to be mapped to values and eventually to be mapped to other top level groups defines
+   * by the {@code SensorRegistry}.
+   * </p>
+   *
+   * <p>
+   * 6 Step topology: <br>
+   * (1) Inputs (Config, Values, Aggregations) <br>
+   * (2) Merge Input Values and Aggregations <br>
+   * (3) Join Configuration with Merged Input Stream <br>
+   * (4) Duplicate as flatmap per value and group <br>
+   * (5) Window (preparation for possible last values) <br>
+   * (6) Aggregate data over the window
+   * </p>
+   *
+   * @param pipe The blank pipeline to extend the logic to.
+   * @param inputSource A streaming source with {@code ActivePowerRecord} data.
+   * @param aggregationSource A streaming source with aggregated data.
+   * @param configurationSource A streaming source delivering a {@code SensorRegistry}.
+   * @return A {@code StreamSource<String,Double>} with sensorKeys or groupKeys mapped to their
+   *         according aggregated values. The data can be further modified or directly be linked to
+   *         a Hazelcast Jet sink.
+   */
+  public StreamStage
+      <Map.Entry<String, AggregatedActivePowerRecord>>
+      extendUc4Topology(final Pipeline pipe,
+                        final StreamSource<Map.Entry<String, ActivePowerRecord>> inputSource,
+                        final StreamSource<Map.Entry<String, AggregatedActivePowerRecord>>
+                            aggregationSource,
+                        final StreamSource<Map.Entry<Event, String>> configurationSource) {
+
+    //////////////////////////////////
+    // (1) Configuration Stream
+    pipe.readFrom(configurationSource)
+        .withNativeTimestamps(0)
+        .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED
+            || entry.getKey() == Event.SENSOR_REGISTRY_STATUS)
+        .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())))
+        .flatMapStateful(HashMap::new, new ConfigFlatMap())
+        .writeTo(Sinks.mapWithUpdating(
+            SENSOR_PARENT_MAP_NAME, // The addressed IMAP
+            Map.Entry::getKey, // The key to look for
+            (oldValue, newEntry) -> newEntry.getValue()));
+
+    //////////////////////////////////
+    // (1) Sensor Input Stream
+    final StreamStage<Map.Entry<String, ActivePowerRecord>> inputStream = pipe
+        .readFrom(inputSource)
+        .withNativeTimestamps(0);
+
+    //////////////////////////////////
+    // (1) Aggregation Stream
+    final StreamStage<Map.Entry<String, ActivePowerRecord>> aggregations = pipe
+        .readFrom(aggregationSource)
+        .withNativeTimestamps(0)
+        .map(entry -> { // Map Aggregated to ActivePowerRecord
+          final AggregatedActivePowerRecord agg = entry.getValue();
+          final ActivePowerRecord record = new ActivePowerRecord(
+              agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
+          return Util.entry(entry.getKey(), record);
+        });
+
+    //////////////////////////////////
+    // (2) UC4 Merge Input with aggregation stream
+    final StreamStageWithKey<Map.Entry<String, ActivePowerRecord>, String>
+        mergedInputAndAggregations = inputStream
+        .merge(aggregations)
+        .groupingKey(Map.Entry::getKey);
+
+    //////////////////////////////////
+    // (3) UC4 Join Configuration and Merges Input/Aggregation Stream
+    // [sensorKey , (value,Set<Groups>)]
+    final StreamStage<Map.Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations
+        .<Set<String>, Entry<String, ValueGroup>>mapUsingIMap(
+            SENSOR_PARENT_MAP_NAME,
+            (sensorEvent, sensorParentsSet) -> {
+              // Check whether a groupset exists for a key or not
+              if (sensorParentsSet == null) {
+                // No group set exists for this key: return valuegroup with default null group set.
+                final Set<String> nullSet = new HashSet<>();
+                nullSet.add("NULL-GROUPSET");
+                return Util.entry(sensorEvent.getKey(),
+                    new ValueGroup(sensorEvent.getValue(), nullSet));
+              } else {
+                // Group set exists for this key: return valuegroup with the groupset.
+                final ValueGroup valueParentsPair =
+                    new ValueGroup(sensorEvent.getValue(), sensorParentsSet);
+                // Return solution
+                return Util.entry(sensorEvent.getKey(), valueParentsPair);
+              }
+            });
+
+    //////////////////////////////////
+    // (4) UC4 Duplicate as flatmap joined Stream
+    // [(sensorKey, Group) , value]
+    final StreamStage<Map.Entry<SensorGroupKey, ActivePowerRecord>> dupliAsFlatmappedStage =
+        joinedStage.flatMap(entry -> {
+
+          // Supplied data
+          final String keyGroupId = entry.getKey();
+          final ActivePowerRecord record = entry.getValue().getRecord();
+          final Set<String> groups = entry.getValue().getGroups();
+
+          // Transformed Data
+          final String[] groupList = groups.toArray(String[]::new);
+          final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length];
+          final List<Map.Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>();
+          for (int i = 0; i < groupList.length; i++) {
+            newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
+            newEntryList.add(Util.entry(newKeyList[i], record));
+          }
+
+          // Return traversable list of new entry elements
+          return Traversers.traverseIterable(newEntryList);
+        });
+
+    //////////////////////////////////
+    // (5) UC4 Last Value Map
+    // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
+    final StageWithWindow<Map.Entry<SensorGroupKey, ActivePowerRecord>>
+        windowedLastValues = dupliAsFlatmappedStage
+        .window(WindowDefinition.tumbling(windowSize));
+
+    final AggregateOperation1<Map.Entry<SensorGroupKey, ActivePowerRecord>,
+        AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp =
+        AggregateOperation
+            .withCreate(AggregatedActivePowerRecordAccumulator::new)
+            .<Map.Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> {
+              acc.setId(rec.getKey().getGroup());
+              acc.addInputs(rec.getValue());
+            })
+            .andCombine((acc, acc2) ->
+                acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp()))
+            .andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount()))
+            .andExportFinish(acc ->
+                new AggregatedActivePowerRecord(acc.getId(),
+                    acc.getTimestamp(),
+                    acc.getCount(),
+                    acc.getSumInW(),
+                    acc.getAverageInW())
+            );
+
+    // write aggregation back to kafka
+
+    return windowedLastValues
+        .groupingKey(entry -> entry.getKey().getGroup())
+        .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
+  }
+
+
+  /**
+   * FlatMap function used to process the configuration input for UC4.
+   */
+  private static class ConfigFlatMap implements
+      BiFunctionEx<Map<String, Set<String>>, Map.Entry<Event, SensorRegistry>, Traverser<Map.Entry<String, Set<String>>>> { // NOCS
+
+    private static final long serialVersionUID = -6769931374907428699L;
+
+    @Override
+    public Traverser<Map.Entry<String, Set<String>>> applyEx(
+        final Map<String, Set<String>> flatMapStage,
+        final Map.Entry<Event, SensorRegistry> eventItem) {
+      // Transform new Input
+      final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name");
+      final Map<String, Set<String>> mapFromRegistry =
+          transformer.constructChildParentsPairs(eventItem.getValue());
+
+      // Compare both tables
+      final Map<String, Set<String>> updates = new HashMap<>();
+      for (final String key : mapFromRegistry.keySet()) {
+        if (flatMapStage.containsKey(key)) {
+          if (!mapFromRegistry.get(key).equals(flatMapStage.get(key))) {
+            updates.put(key, mapFromRegistry.get(key));
+          }
+        } else {
+          updates.put(key, mapFromRegistry.get(key));
+        }
+      }
+
+      // Create a updates list to pass onto the next pipeline stage-
+      final List<Map.Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet());
+
+      // Return traverser with updates list.
+      return Traversers.traverseIterable(updatesList)
+          .map(e -> Util.entry(e.getKey(), e.getValue()));
+    }
+
+  }
+}
-- 
GitLab