Skip to content
Snippets Groups Projects
Commit dbb420b1 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Clean beam-commons project

parent aa2706f9
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import java.util.Properties;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
...@@ -9,25 +8,29 @@ import org.apache.beam.sdk.values.PDone; ...@@ -9,25 +8,29 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
/**
public class KafkaWriterTransformation<K> extends * Wrapper for a Kafka writing Transformation
PTransform<PCollection<KV<String, K>>, PDone> { * where the value type can be generic.
* @param <T> type of the value.
*/
public class KafkaWriterTransformation<T extends Serializer> extends
PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L; private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, K>>, PDone> writer; private final PTransform<PCollection<KV<String, T>>, PDone> writer;
/**
* Creates a new kafka writer transformation.
*/
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
final java.lang final Class<? extends Serializer<T>> valueSerializer) {
.Class<? extends org.apache.kafka
.common.serialization
.Serializer<K>> valueSerializer) {
super(); super();
// Check if boostrap server and outputTopic are defined // Check if boostrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
} }
this.writer = KafkaIO.<String, K>write() this.writer = KafkaIO.<String, T>write()
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(outputTopic) .withTopic(outputTopic)
.withKeySerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class)
...@@ -36,7 +39,7 @@ public class KafkaWriterTransformation<K> extends ...@@ -36,7 +39,7 @@ public class KafkaWriterTransformation<K> extends
} }
@Override @Override
public PDone expand(PCollection<KV<String, K>> input) { public PDone expand(final PCollection<KV<String, T>> input) {
return input.apply(this.writer); return input.apply(this.writer);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment