Skip to content
Snippets Groups Projects
Commit 0612f203 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add generic KafkaWriterTransformation

parent 96be66b8
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam.kafka;
import java.util.Properties;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaWriterTransformation<K> extends
PTransform<PCollection<KV<String, K>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, K>>, PDone> writer;
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
final java.lang
.Class<? extends org.apache.kafka
.common.serialization
.Serializer<K>> valueSerializer) {
super();
// Check if boostrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
}
this.writer = KafkaIO.<String, K>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer);
}
@Override
public PDone expand(PCollection<KV<String, K>> input) {
return input.apply(this.writer);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment