From 3bdc9c14603e9c6db2a19882226527cfaf223270 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 31 Jan 2022 18:02:01 +0100
Subject: [PATCH] Write Avro records with schema registry

---
 .../commons/beam/AbstractPipeline.java        | 50 +++++++++++++------
 .../beam/kafka/KafkaWriterTransformation.java | 32 ++++++++----
 2 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
index c936ce918..e67c5f60b 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
@@ -12,6 +12,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
  */
 public class AbstractPipeline extends Pipeline {
 
+  private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader";
+  private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url";
   protected final String inputTopic;
   protected final String bootstrapServer;
   // Application Configurations
@@ -21,8 +23,8 @@ public class AbstractPipeline extends Pipeline {
     super(options);
     this.config = config;
 
-    inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
-    bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
   }
 
   /**
@@ -32,19 +34,37 @@ public class AbstractPipeline extends Pipeline {
    */
   public Map<String, Object> buildConsumerConfig() {
     final Map<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-        config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        config
-            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
-    consumerConfig.put("schema.registry.url",
-        config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
-
-    consumerConfig.put("specific.avro.reader",
-        config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
-
-    final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName);
+    consumerConfig.put(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
+    consumerConfig.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+    consumerConfig.put(
+        KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
+        this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
+    consumerConfig.put(
+        KAFKA_CONFIG_SPECIFIC_AVRO_READER,
+        this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
+    consumerConfig.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        this.config.getString(ConfigurationKeys.APPLICATION_NAME));
     return consumerConfig;
   }
+
+  /**
+   * Builds a simple configuration for a Kafka producer transformation.
+   *
+   * @return the build configuration.
+   */
+  public Map<String, Object> buildProducerConfig() {
+    final Map<String, Object> config = new HashMap<>();
+    config.put(
+        KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
+        this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
+    config.put(
+        KAFKA_CONFIG_SPECIFIC_AVRO_READER,
+        this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
+    return config;
+  }
 }
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
index 0a3867e71..6d33f6f01 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
@@ -1,5 +1,6 @@
 package theodolite.commons.beam.kafka;
 
+import java.util.Map;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 /**
- * Wrapper for a Kafka writing Transformation
- * where the value type can be generic.
+ * Wrapper for a Kafka writing Transformation where the value type can be generic.
+ *
  * @param <T> type of the value.
  */
-public class KafkaWriterTransformation<T> extends
-    PTransform<PCollection<KV<String, T>>, PDone> {
+public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
 
   private static final long serialVersionUID = 3171423303843174723L;
   private final PTransform<PCollection<KV<String, T>>, PDone> writer;
 
   /**
-   * Creates a new kafka writer transformation.
+   * Creates a new Kafka writer transformation.
    */
-  public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
-                                   final Class<? extends Serializer<T>> valueSerializer) {
+  public KafkaWriterTransformation(
+      final String bootstrapServer,
+      final String outputTopic,
+      final Class<? extends Serializer<T>> valueSerializer) {
+    this(bootstrapServer, outputTopic, valueSerializer, Map.of());
+  }
+
+  /**
+   * Creates a new Kafka writer transformation.
+   */
+  public KafkaWriterTransformation(
+      final String bootstrapServer,
+      final String outputTopic,
+      final Class<? extends Serializer<T>> valueSerializer,
+      final Map<String, Object> producerConfig) {
     super();
-    // Check if boostrap server and outputTopic are defined
+    // Check if bootstrap server and outputTopic are defined
     if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
       throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
     }
@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
         .withBootstrapServers(bootstrapServer)
         .withTopic(outputTopic)
         .withKeySerializer(StringSerializer.class)
-        .withValueSerializer(valueSerializer);
+        .withValueSerializer(valueSerializer)
+        .withProducerConfigUpdates(producerConfig);
 
   }
 
-- 
GitLab