From dbb420b15bde2f7269bd73dee69b797d8d2f8323 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Fri, 12 Nov 2021 16:35:35 +0100
Subject: [PATCH] Clean beam-commons project

---
 .../beam/kafka/KafkaWriterTransformation.java | 25 +++++++++++--------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
index 033affa0f..1bdcd3207 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
@@ -1,6 +1,5 @@
 package theodolite.commons.beam.kafka;
 
-import java.util.Properties;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -9,25 +8,29 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
-
-public class KafkaWriterTransformation<K> extends
-    PTransform<PCollection<KV<String, K>>, PDone> {
+/**
+ * Wrapper for a Kafka writing Transformation
+ * where the value type can be generic.
+ * @param <T> type of the value.
+ */
+public class KafkaWriterTransformation<T extends Serializer> extends
+    PTransform<PCollection<KV<String, T>>, PDone> {
 
   private static final long serialVersionUID = 3171423303843174723L;
-  private final PTransform<PCollection<KV<String, K>>, PDone> writer;
+  private final PTransform<PCollection<KV<String, T>>, PDone> writer;
 
+  /**
+   * Creates a new kafka writer transformation.
+   */
   public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
-                                   final java.lang
-                                       .Class<? extends org.apache.kafka
-                                       .common.serialization
-                                       .Serializer<K>> valueSerializer) {
+                                   final Class<? extends Serializer<T>> valueSerializer) {
     super();
     // Check if boostrap server and outputTopic are defined
     if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
       throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
     }
 
-    this.writer = KafkaIO.<String, K>write()
+    this.writer = KafkaIO.<String, T>write()
         .withBootstrapServers(bootstrapServer)
         .withTopic(outputTopic)
         .withKeySerializer(StringSerializer.class)
@@ -36,7 +39,7 @@ public class KafkaWriterTransformation<K> extends
   }
 
   @Override
-  public PDone expand(PCollection<KV<String, K>> input) {
+  public PDone expand(final PCollection<KV<String, T>> input) {
     return input.apply(this.writer);
   }
 }
-- 
GitLab