From 0612f203d8836c034a3221c475faf5c1a1c56ed4 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 12 Nov 2021 15:44:15 +0100
Subject: [PATCH] Add generic KafkaWriterTransformation

---
 .../beam/kafka/KafkaWriterTransformation.java | 42 +++++++++++++++++++
 1 file changed, 42 insertions(+)
 create mode 100644 theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
new file mode 100644
index 000000000..033affa0f
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
@@ -0,0 +1,42 @@
+package theodolite.commons.beam.kafka;
+
+import java.util.Properties;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+
+public class KafkaWriterTransformation<K> extends
+    PTransform<PCollection<KV<String, K>>, PDone> {
+
+  private static final long serialVersionUID = 3171423303843174723L;
+  private final PTransform<PCollection<KV<String, K>>, PDone> writer;
+
+  public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
+                                   final java.lang
+                                       .Class<? extends org.apache.kafka
+                                       .common.serialization
+                                       .Serializer<K>> valueSerializer) {
+    super();
+    // Check if boostrap server and outputTopic are defined
+    if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
+      throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
+    }
+
+    this.writer = KafkaIO.<String, K>write()
+        .withBootstrapServers(bootstrapServer)
+        .withTopic(outputTopic)
+        .withKeySerializer(StringSerializer.class)
+        .withValueSerializer(valueSerializer);
+
+  }
+
+  @Override
+  public PDone expand(PCollection<KV<String, K>> input) {
+    return input.apply(this.writer);
+  }
+}
-- 
GitLab