diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java new file mode 100644 index 0000000000000000000000000000000000000000..033affa0f9d457c0bc1870724cec770e371461a4 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java @@ -0,0 +1,42 @@ +package theodolite.commons.beam.kafka; + +import java.util.Properties; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; + + +public class KafkaWriterTransformation<K> extends + PTransform<PCollection<KV<String, K>>, PDone> { + + private static final long serialVersionUID = 3171423303843174723L; + private final PTransform<PCollection<KV<String, K>>, PDone> writer; + + public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, + final java.lang + .Class<? extends org.apache.kafka + .common.serialization + .Serializer<K>> valueSerializer) { + super(); + // Check if boostrap server and outputTopic are defined + if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { + throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); + } + + this.writer = KafkaIO.<String, K>write() + .withBootstrapServers(bootstrapServer) + .withTopic(outputTopic) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(valueSerializer); + + } + + @Override + public PDone expand(PCollection<KV<String, K>> input) { + return input.apply(this.writer); + } +}