From 4df71d16928793c0b803261f1c5aaeea6be9f5dd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 31 Jan 2022 18:01:26 +0100
Subject: [PATCH] Write Avro records with schema registry

---
 .../java/application/DuplicateAsFlatMap.java  | 21 +++---
 .../java/application/Uc4BeamPipeline.java     | 65 +++++++++++--------
 ...gregatedActivePowerRecordDeserializer.java | 25 +------
 ...AggregatedActivePowerRecordSerializer.java | 37 +----------
 4 files changed, 52 insertions(+), 96 deletions(-)

diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
index 7b66082c9..cf25f043e 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
@@ -16,11 +16,11 @@ import titan.ccp.model.records.ActivePowerRecord;
 /**
  * Duplicates the Kv containing the (Children,Parents) pair as a flat map.
  */
-public class DuplicateAsFlatMap extends DoFn
-    <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
+public class DuplicateAsFlatMap
+    extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
   private static final long serialVersionUID = -5132355515723961647L;
   @StateId("parents")
-  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD
+  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD
   private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
 
   public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
@@ -30,19 +30,20 @@ public class DuplicateAsFlatMap extends DoFn
 
 
   /**
-   *  Generate a KV-pair for every child-parent match.
+   * Generate a KV-pair for every child-parent match.
    */
   @ProcessElement
-  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
-                             final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
-                             @StateId("parents") final ValueState<Set<String>> state,
-                             final ProcessContext c) {
+  public void processElement(
+      @Element final KV<String, ActivePowerRecord> kv,
+      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+      @StateId("parents") final ValueState<Set<String>> state,
+      final ProcessContext c) {
 
     final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
     final Set<String> newParents =
-        c.sideInput(childParentPairMap).get(kv.getKey()) == null
+        c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
             ? Collections.emptySet()
-            : c.sideInput(childParentPairMap).get(kv.getKey());
+            : c.sideInput(this.childParentPairMap).get(kv.getKey());
     final Set<String> oldParents =
         MoreObjects.firstNonNull(state.read(), Collections.emptySet());
     // Forward new Pairs if they exist
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
index 7179fe5da..0c63e6f93 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
@@ -66,8 +66,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
     final Duration gracePeriod =
         Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
 
-    // Build kafka configuration
-    final Map<String, Object> consumerConfig = this.buildConsumerConfig();
+    // Build Kafka configuration
+    final Map<String, Object> consumerConfig = super.buildConsumerConfig();
     final Map<String, Object> configurationConfig = this.configurationConfig(config);
 
     // Set Coders for Classes that will be distributed
@@ -77,25 +77,34 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
     // Read from Kafka
     // ActivePowerRecords
     final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader =
-        new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
+        new KafkaActivePowerTimestampReader(
+            this.bootstrapServer,
+            this.inputTopic,
+            consumerConfig);
 
     // Configuration Events
     final KafkaGenericReader<Event, String> kafkaConfigurationReader =
         new KafkaGenericReader<>(
-            this.bootstrapServer, configurationTopic, configurationConfig,
-            EventDeserializer.class, StringDeserializer.class);
-
-    // Transform into AggregatedActivePowerRecords into ActivePowerRecords
-    final AggregatedToActive aggregatedToActive = new AggregatedToActive();
+            this.bootstrapServer,
+            configurationTopic,
+            configurationConfig,
+            EventDeserializer.class,
+            StringDeserializer.class);
 
     // Write to Kafka
     final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput =
         new KafkaWriterTransformation<>(
-            this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class);
+            this.bootstrapServer,
+            outputTopic,
+            AggregatedActivePowerRecordSerializer.class,
+            super.buildProducerConfig());
 
     final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback =
         new KafkaWriterTransformation<>(
-            this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
+            this.bootstrapServer,
+            feedbackTopic,
+            AggregatedActivePowerRecordSerializer.class,
+            super.buildProducerConfig());
 
     // Apply pipeline transformations
     final PCollection<KV<String, ActivePowerRecord>> values = this
@@ -115,7 +124,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
             .withBootstrapServers(this.bootstrapServer)
             .withTopic(feedbackTopic)
             .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
+            .withValueDeserializerAndCoder(
+                AggregatedActivePowerRecordDeserializer.class,
+                AvroCoder.of(AggregatedActivePowerRecord.class))
+            .withConsumerConfigUpdates(consumerConfig)
             .withTimestampPolicyFactory(
                 (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
                     previousWaterMark))
@@ -123,11 +135,12 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
         .apply("Apply Windows", Window.into(FixedWindows.of(duration)))
         // Convert into the correct data format
         .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
-            MapElements.via(aggregatedToActive))
+            MapElements.via(new AggregatedToActive()))
         .apply("Set trigger for feedback", Window
             .<KV<String, ActivePowerRecord>>configure()
             .triggering(Repeatedly.forever(
-                AfterProcessingTime.pastFirstElementInPane()
+                AfterProcessingTime
+                    .pastFirstElementInPane()
                     .plusDelayOf(triggerDelay)))
             .withAllowedLateness(gracePeriod)
             .discardingFiredPanes());
@@ -170,17 +183,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
                 .accumulatingFiredPanes())
             .apply(View.asMap());
 
-    final FilterNullValues filterNullValues = new FilterNullValues();
-
     // Build pairs of every sensor reading and parent
     final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
         inputCollection.apply(
             "Duplicate as flatMap",
-            ParDo.of(new DuplicateAsFlatMap(childParentPairMap))
-                .withSideInputs(childParentPairMap))
+            ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
             .apply("Filter only latest changes", Latest.perKey())
-            .apply("Filter out null values",
-                Filter.by(filterNullValues));
+            .apply("Filter out null values", Filter.by(new FilterNullValues()));
 
     final SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
     final SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
@@ -204,8 +213,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
 
     aggregations.apply("Write to aggregation results", kafkaOutput);
 
-    aggregations
-        .apply("Write to feedback topic", kafkaFeedback);
+    aggregations.apply("Write to feedback topic", kafkaFeedback);
 
   }
 
@@ -217,14 +225,15 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
    */
   public Map<String, Object> configurationConfig(final Configuration config) {
     final Map<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+    consumerConfig.put(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        config
-            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
-
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
-        .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
+    consumerConfig.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+    consumerConfig.put(
+        ConsumerConfig.GROUP_ID_CONFIG, config
+            .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
     return consumerConfig;
   }
 
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
index 876260f96..3076861a5 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
@@ -1,33 +1,12 @@
 package serialization;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import org.apache.beam.sdk.coders.AvroCoder;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
  * {@link Deserializer} for an {@link AggregatedActivePowerRecord}.
  */
 public class AggregatedActivePowerRecordDeserializer
-    implements Deserializer<AggregatedActivePowerRecord> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
-
-  private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
-      AvroCoder.of(AggregatedActivePowerRecord.class);
-
-  @Override
-  public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
-    try {
-      return this.avroEnCoder.decode(new ByteArrayInputStream(data));
-    } catch (final IOException e) {
-      LOGGER.error("Could not deserialize AggregatedActivePowerRecord.", e);
-      return null;
-    }
-  }
-
+    extends SpecificAvroDeserializer<AggregatedActivePowerRecord> {
 }
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
index 1c80f2817..26801d8a2 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
@@ -1,45 +1,12 @@
 package serialization;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.beam.sdk.coders.AvroCoder;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
  * {@link Serializer} for an {@link AggregatedActivePowerRecord}.
  */
 public class AggregatedActivePowerRecordSerializer
-    implements Serializer<AggregatedActivePowerRecord> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
-
-  private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
-      AvroCoder.of(AggregatedActivePowerRecord.class);
-
-  @Override
-  public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
-    final ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try {
-      this.avroEnCoder.encode(data, out);
-    } catch (final IOException e) {
-      LOGGER.error("Could not serialize AggregatedActivePowerRecord.", e);
-    }
-    final byte[] result = out.toByteArray();
-    try {
-      out.close();
-    } catch (final IOException e) {
-      LOGGER.error(
-          "Could not close output stream after serialization of AggregatedActivePowerRecord.", e);
-    }
-    return result;
-  }
-
-  @Override
-  public void close() {
-    Serializer.super.close();
-  }
+    extends SpecificAvroSerializer<AggregatedActivePowerRecord> {
 }
-- 
GitLab