From 40b904ee70a670828d85cba2f2130073220591ad Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 8 Dec 2022 17:40:14 +0100
Subject: [PATCH] Clean up code

---
 .../uc4/beam/AggregatedToActive.java          |  8 +-
 .../uc4/flink/AggregationServiceFlinkJob.java |  2 +-
 .../hazelcastjet/ChildParentsTransformer.java | 77 +------------------
 .../kstreams/MockedSchemaRegistrySerdes.java  |  1 -
 4 files changed, 9 insertions(+), 79 deletions(-)

diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java
index 4d1c2241e..09d59a83b 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java
@@ -16,7 +16,11 @@ public class AggregatedToActive
   @Override
   public KV<String, ActivePowerRecord> apply(
       final KV<String, AggregatedActivePowerRecord> kv) {
-    return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
-        kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
+    return KV.of(
+        kv.getKey(),
+        new ActivePowerRecord(
+            kv.getValue().getIdentifier(),
+            kv.getValue().getTimestamp(),
+            kv.getValue().getSumInW()));
   }
 }
diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
index abdb9aaed..f893c0b40 100644
--- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java
@@ -131,7 +131,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
                 || tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
             .name("[Filter] SensorRegistry changed")
             .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry")
-            .keyBy(sr -> 1)
+            // .keyBy(sr -> 1)
             .flatMap(new ChildParentsFlatMapFunction())
             .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])");
 
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
index b9b599ed8..47bd04a12 100644
--- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java
@@ -1,16 +1,9 @@
 package rocks.theodolite.benchmarks.uc4.hazelcastjet;
 
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import rocks.theodolite.benchmarks.commons.configuration.events.Event;
 import rocks.theodolite.benchmarks.commons.model.sensorregistry.AggregatedSensor;
 import rocks.theodolite.benchmarks.commons.model.sensorregistry.Sensor;
 import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
@@ -23,46 +16,9 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry;
  * does not longer exists in the sensor registry.
  *
  */
-public class ChildParentsTransformer implements
-    Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> {
+public class ChildParentsTransformer {
 
-  private final String stateStoreName;
-  // private ProcessorContext context;
-  private KeyValueStore<String, Set<String>> state;
-
-  public ChildParentsTransformer(final String stateStoreName) {
-    this.stateStoreName = stateStoreName;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void init(final ProcessorContext context) {
-    // this.context = context;
-    this.state = (KeyValueStore<String, Set<String>>) context.getStateStore(this.stateStoreName);
-  }
-
-  @Override
-  public Iterable<KeyValue<String, Optional<Set<String>>>> transform(final Event event,
-      final SensorRegistry registry) {
-
-    // Values may later be null for deleting a sensor
-    final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(registry);
-
-    this.updateChildParentsPairs(childParentsPairs);
-
-    this.updateState(childParentsPairs);
-
-    return childParentsPairs
-        .entrySet()
-        .stream()
-        .map(e -> KeyValue.pair(e.getKey(), Optional.ofNullable(e.getValue())))
-        .collect(Collectors.toList());
-  }
-
-  @Override
-  public void close() {
-    // Do nothing
-  }
+  public ChildParentsTransformer() {}
 
   /**
    * Constructs a map of keys to their set of parents out of a SensorRegistry.
@@ -87,33 +43,4 @@ public class ChildParentsTransformer implements
                 : Stream.empty()));
   }
 
-  private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs) {
-    final KeyValueIterator<String, Set<String>> oldChildParentsPairs = this.state.all();
-    while (oldChildParentsPairs.hasNext()) {
-      final KeyValue<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next();
-      final String identifier = oldChildParentPair.key;
-      final Set<String> oldParents = oldChildParentPair.value;
-      final Set<String> newParents = childParentsPairs.get(identifier); // null if not exists
-      if (newParents == null) {
-        // Sensor was deleted
-        childParentsPairs.put(identifier, null);
-      } else if (newParents.equals(oldParents)) {
-        // No changes
-        childParentsPairs.remove(identifier);
-      }
-      // Else: Later Perhaps: Mark changed parents
-    }
-    oldChildParentsPairs.close();
-  }
-
-  private void updateState(final Map<String, Set<String>> childParentsPairs) {
-    for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) {
-      if (childParentPair.getValue() == null) {
-        this.state.delete(childParentPair.getKey());
-      } else {
-        this.state.put(childParentPair.getKey(), childParentPair.getValue());
-      }
-    }
-  }
-
 }
diff --git a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java
index c9fb98a70..8a725b691 100644
--- a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java
+++ b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java
@@ -9,7 +9,6 @@ import org.apache.avro.specific.SpecificRecord;
 import org.apache.kafka.common.serialization.Serde;
 import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
 
-
 public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory {
 
   private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
-- 
GitLab