Skip to content
Snippets Groups Projects
Commit 9e51d8e7 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Add generic KafkaWriterTransformation

parent cfab1593
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam.kafka;
import java.util.Properties;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaWriterTransformation<K> extends
PTransform<PCollection<KV<String, K>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, K>>, PDone> writer;
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
final java.lang
.Class<? extends org.apache.kafka
.common.serialization
.Serializer<K>> valueSerializer) {
super();
// Check if boostrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
}
this.writer = KafkaIO.<String, K>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer);
}
@Override
public PDone expand(PCollection<KV<String, K>> input) {
return input.apply(this.writer);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment