From f2e3645f9014167477d77677b80f20be998eb396 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 28 Oct 2021 16:24:45 +0200
Subject: [PATCH] Externalize DuplicateAsFlatMap uc4-beam-samza

---
 .../java/application/DuplicateAsFlatMap.java  | 68 +++++++++++++++++++
 .../java/application/Uc4ApplicationBeam.java  | 50 +-------------
 2 files changed, 71 insertions(+), 47 deletions(-)
 create mode 100644 theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java

diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java
new file mode 100644
index 000000000..ac5206db5
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java
@@ -0,0 +1,68 @@
+package application;
+
+import com.google.common.base.MoreObjects;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import titan.ccp.model.records.ActivePowerRecord;
+
+
+/**
+ * Duplicates the data as a flat map.
+ */
+public class DuplicateAsFlatMap extends DoFn
+    <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
+  @StateId("parents")
+  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
+  private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
+
+  public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
+    super();
+    this.childParentPairMap = childParentPairMap;
+  }
+
+
+  /**
+   *  Generate a KV-pair for every child-parent match.
+   */
+  @ProcessElement
+  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
+                             final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+                             @StateId("parents") final ValueState<Set<String>> state,
+                             final ProcessContext c) {
+
+    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
+    final Set<String> newParents =
+        c.sideInput(childParentPairMap).get(kv.getKey()) == null
+            ? Collections.emptySet()
+            : c.sideInput(childParentPairMap).get(kv.getKey());
+    final Set<String> oldParents =
+        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
+    // Forward new Pairs if they exist
+    if (!newParents.isEmpty()) {
+      for (final String parent : newParents) {
+
+        // Forward flat mapped record
+        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
+        out.output(KV.of(key, record));
+      }
+    }
+    if (!newParents.equals(oldParents)) {
+      for (final String oldParent : oldParents) {
+        if (!newParents.contains(oldParent)) {
+          // Forward Delete
+          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
+          out.output(KV.of(key, null));
+        }
+      }
+      state.write(newParents);
+    }
+  }
+}
+
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
index 2bef6aab9..7a9a8ca20 100644
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
@@ -1,9 +1,7 @@
 package application;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.math.StatsAccumulator;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -17,11 +15,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Latest;
@@ -66,7 +60,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
  * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
  * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
  */
-public class Uc4ApplicationBeam {
+final public class Uc4ApplicationBeam {
   private static final String JOB_NAME = "Uc4Application";
   private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
   private static final String INPUT = "INPUT";
@@ -285,46 +279,7 @@ public class Uc4ApplicationBeam {
     final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
         inputCollection.apply(
             "Duplicate as flatMap",
-            ParDo.of(
-                new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
-                  @StateId("parents")
-                  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
-
-                  // Generate a KV-pair for every child-parent match
-                  @ProcessElement
-                  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
-                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
-                      @StateId("parents") final ValueState<Set<String>> state,
-                      final ProcessContext c) {
-                    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
-                    final Set<String> newParents =
-                        c.sideInput(childParentPairMap).get(kv.getKey()) == null
-                            ? Collections.emptySet()
-                            : c.sideInput(childParentPairMap).get(kv.getKey());
-                    final Set<String> oldParents =
-                        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
-                    // Forward new Pairs if they exist
-                    if (!newParents.isEmpty()) {
-                      for (final String parent : newParents) {
-
-                        // Forward flat mapped record
-                        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
-                        out.output(KV.of(key, record));
-                      }
-                    }
-                    if (!newParents.equals(oldParents)) {
-                      for (final String oldParent : oldParents) {
-                        if (!newParents.contains(oldParent)) {
-                          // Forward Delete
-                          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
-                          out.output(KV.of(key, null));
-                        }
-                      }
-                      state.write(newParents);
-                    }
-                  }
-                }).withSideInputs(childParentPairMap))
-
+            ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
             .apply("Filter only latest changes", Latest.perKey())
             .apply("Filter out null values",
                 Filter.by(
@@ -334,6 +289,7 @@ public class Uc4ApplicationBeam {
                         return kv.getValue() != null;
                       }
                     }));
+
     // Aggregate for every sensor group of the current level
     final PCollection<KV<String, AggregatedActivePowerRecord>>
         aggregations = flatMappedValues
-- 
GitLab