diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java index 033affa0f9d457c0bc1870724cec770e371461a4..1bdcd3207f1b4edf31f083aa388421ca4812e3fd 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java @@ -1,6 +1,5 @@ package theodolite.commons.beam.kafka; -import java.util.Properties; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; @@ -9,25 +8,29 @@ import org.apache.beam.sdk.values.PDone; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; - -public class KafkaWriterTransformation<K> extends - PTransform<PCollection<KV<String, K>>, PDone> { +/** + * Wrapper for a Kafka writing Transformation + * where the value type can be generic. + * @param <T> type of the value. + */ +public class KafkaWriterTransformation<T extends Serializer> extends + PTransform<PCollection<KV<String, T>>, PDone> { private static final long serialVersionUID = 3171423303843174723L; - private final PTransform<PCollection<KV<String, K>>, PDone> writer; + private final PTransform<PCollection<KV<String, T>>, PDone> writer; + /** + * Creates a new kafka writer transformation. + */ public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, - final java.lang - .Class<? extends org.apache.kafka - .common.serialization - .Serializer<K>> valueSerializer) { + final Class<? extends Serializer<T>> valueSerializer) { super(); // Check if boostrap server and outputTopic are defined if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); } - this.writer = KafkaIO.<String, K>write() + this.writer = KafkaIO.<String, T>write() .withBootstrapServers(bootstrapServer) .withTopic(outputTopic) .withKeySerializer(StringSerializer.class) @@ -36,7 +39,7 @@ public class KafkaWriterTransformation<K> extends } @Override - public PDone expand(PCollection<KV<String, K>> input) { + public PDone expand(final PCollection<KV<String, T>> input) { return input.apply(this.writer); } }