From ac0e3dc55b070aeb4236c539091b45670c83d537 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 31 Jan 2022 13:14:22 +0100
Subject: [PATCH] Code refactoring for Kafka Beam reader

---
 .../kafka/ActivePowerRecordDeserializer.java  |  6 ++
 .../kafka/KafkaActivePowerRecordReader.java   | 61 -------------------
 .../KafkaActivePowerTimestampReader.java      | 38 ++++++------
 .../kafka/TypedKafkaAvroDeserializer.java     | 49 +++++++++++++++
 4 files changed, 72 insertions(+), 82 deletions(-)
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java
 delete mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java
new file mode 100644
index 000000000..2ec0f464d
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java
@@ -0,0 +1,6 @@
+package theodolite.commons.beam.kafka;
+
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class ActivePowerRecordDeserializer extends TypedKafkaAvroDeserializer<ActivePowerRecord> {
+}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
deleted file mode 100644
index f102bee41..000000000
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package theodolite.commons.beam.kafka;
-
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.Map;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
- */
-public class KafkaActivePowerRecordReader extends
-    PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
-
-  private static final long serialVersionUID = 2603286150183186115L;
-  private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
-
-
-  /**
-   * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
-   */
-  public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
-                                      final Map<String, Object> consumerConfig) {
-    super();
-
-    if (bootstrapServer == null) {
-      throw new IllegalArgumentException("bootstrapServer is null");
-    }
-
-    if (inputTopic == null) {
-      throw new IllegalArgumentException("inputTopic is null");
-    }
-
-    // Check if boostrap server and inputTopic are defined
-    if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
-      throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
-    }
-
-
-    reader =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            .withoutMetadata();
-  }
-
-  @Override
-  public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
-    return input.apply(this.reader);
-  }
-
-}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
index 732afe9a0..7a48bd71d 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
@@ -1,6 +1,5 @@
 package theodolite.commons.beam.kafka;
 
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
- * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
- * Has additional a TimestampPolicy.
+ * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
  */
-public class KafkaActivePowerTimestampReader extends
-    PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
+public class KafkaActivePowerTimestampReader
+    extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
 
   private static final long serialVersionUID = 2603286150183186115L;
   private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
 
-
   /**
    * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
    */
-  public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic,
-                                         final Map<String, Object> consumerConfig) {
+  public KafkaActivePowerTimestampReader(
+      final String bootstrapServer,
+      final String inputTopic,
+      final Map<String, Object> consumerConfig) {
     super();
 
-    // Check if boostrap server and inputTopic are defined
+    // Check if bootstrap server and inputTopic are defined
     if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
       throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
     }
 
-    reader =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            // Set TimeStampPolicy for event time
-            .withTimestampPolicyFactory(
-                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
-            .withoutMetadata();
+    this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
+        .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
+        .withValueDeserializerAndCoder(
+            ActivePowerRecordDeserializer.class,
+            AvroCoder.of(ActivePowerRecord.class))
+        .withConsumerConfigUpdates(consumerConfig)
+        .withTimestampPolicyFactory(
+            (tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
+        .withoutMetadata();
   }
 
   @Override
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java
new file mode 100644
index 000000000..b213aa0bc
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/TypedKafkaAvroDeserializer.java
@@ -0,0 +1,49 @@
+package theodolite.commons.beam.kafka;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class TypedKafkaAvroDeserializer<T> implements Deserializer<T> {
+
+	private final KafkaAvroDeserializer deserializer;
+
+	public TypedKafkaAvroDeserializer() {
+		this.deserializer = new KafkaAvroDeserializer();
+	}
+
+	public TypedKafkaAvroDeserializer(SchemaRegistryClient client) {
+		this.deserializer = new KafkaAvroDeserializer(client);
+	}
+
+	public TypedKafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
+		this.deserializer = new KafkaAvroDeserializer(client, props);
+	}
+
+	@Override
+	public void configure(Map<String, ?> configs, boolean isKey) {
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T deserialize(String s, byte[] bytes) {
+		return (T) this.deserializer.deserialize(s, bytes);
+	}
+
+	/**
+	 * Pass a reader schema to get an Avro projection
+	 */
+	@SuppressWarnings("unchecked")
+	public T deserialize(String topic, byte[] bytes, Schema readerSchema) {
+		return (T) this.deserializer.deserialize(topic, bytes, readerSchema);
+	}
+
+	@Override
+	public void close() {
+		this.deserializer.close();
+	}
+
+}
-- 
GitLab