Skip to content
Snippets Groups Projects
Commit 3bdc9c14 authored by Sören Henning's avatar Sören Henning
Browse files

Write Avro records with schema registry

parent 4df71d16
No related branches found
No related tags found
No related merge requests found
Pipeline #6343 failed
......@@ -12,6 +12,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/
public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader";
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url";
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
......@@ -21,8 +23,8 @@ public class AbstractPipeline extends Pipeline {
super(options);
this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
}
/**
......@@ -32,19 +34,37 @@ public class AbstractPipeline extends Pipeline {
*/
public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put("schema.registry.url",
config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put("specific.avro.reader",
config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName);
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig;
}
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
}
package theodolite.commons.beam.kafka;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
......@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* Wrapper for a Kafka writing Transformation
* where the value type can be generic.
* Wrapper for a Kafka writing Transformation where the value type can be generic.
*
* @param <T> type of the value.
*/
public class KafkaWriterTransformation<T> extends
PTransform<PCollection<KV<String, T>>, PDone> {
public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, T>>, PDone> writer;
/**
* Creates a new kafka writer transformation.
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) {
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) {
this(bootstrapServer, outputTopic, valueSerializer, Map.of());
}
/**
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer,
final Map<String, Object> producerConfig) {
super();
// Check if boostrap server and outputTopic are defined
// Check if bootstrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
}
......@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer);
.withValueSerializer(valueSerializer)
.withProducerConfigUpdates(producerConfig);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment