Skip to content
Snippets Groups Projects
Commit ae34b37c authored by Sören Henning's avatar Sören Henning
Browse files

Refactor KafkaRecordSender to support other types

`KafkaRecordSender` does not only support sending Avro values in combination
with the Confluent Schema Registry anymore, but also arbitrary other
types using a custom serializer.
parent 6290a003
No related branches found
No related tags found
No related merge requests found
Pipeline #6902 failed
...@@ -7,17 +7,18 @@ import org.apache.kafka.clients.producer.KafkaProducer; ...@@ -7,17 +7,18 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Sends monitoring records to Kafka. * Sends records to Kafka.
* *
* @param <T> {@link SpecificRecord} to send * @param <T> Record type to send.
*/ */
public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { public class KafkaRecordSender<T> implements RecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
...@@ -45,21 +46,24 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -45,21 +46,24 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
// properties.put("linger.ms", this.lingerMs); // properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory); // properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>( this.producer = new KafkaProducer<>(
properties, properties,
new StringSerializer(), new StringSerializer(),
avroSerdeFactory.<T>forKeys().serializer()); builder.serializer);
} }
/** public void terminate() {
* Write the passed monitoring record to Kafka. this.producer.close();
*/ }
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record = @Override
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), public void send(final T message) {
this.keyAccessor.apply(monitoringRecord), monitoringRecord); final ProducerRecord<String, T> record = new ProducerRecord<>(
this.topic,
null,
this.timestampAccessor.apply(message),
this.keyAccessor.apply(message),
message);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try { try {
...@@ -71,20 +75,35 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -71,20 +75,35 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
} }
} }
public void terminate() { /**
this.producer.close(); * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}.
} *
* @param bootstrapServers The server to for accessing Kafka.
@Override * @param topic The topic where to write.
public void send(final T message) { * @param serializer The {@link Serializer} for mapping a value to keys.
this.write(message); */
public static <T> Builder<T> builderWithSerializer(
final String bootstrapServers,
final String topic,
final Serializer<T> serializer) {
return new Builder<>(bootstrapServers, topic, serializer);
} }
public static <T extends SpecificRecord> Builder<T> builder( /**
* Creates a Builder object for a {@link KafkaRecordSender} based on a Confluent Schema Registry
* URL.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param serializer URL to the schema registry for avro.
*/
public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry(
final String bootstrapServers, final String bootstrapServers,
final String topic, final String topic,
final String schemaRegistryUrl) { final String schemaRegistryUrl) {
return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
return new Builder<>(bootstrapServers, topic, avroSerdeFactory.<T>forValues().serializer());
} }
/** /**
...@@ -92,27 +111,20 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -92,27 +111,20 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
* *
* @param <T> Type of the records that should later be send. * @param <T> Type of the records that should later be send.
*/ */
public static class Builder<T extends SpecificRecord> { public static class Builder<T> {
private final String bootstrapServers; private final String bootstrapServers;
private final String topic; private final String topic;
private final String schemaRegistryUrl; private final Serializer<T> serializer;
private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD private Properties defaultProperties = new Properties(); // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
private Builder(final String bootstrapServers, final String topic, private Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) { final Serializer<T> serializer) {
this.bootstrapServers = bootstrapServers; this.bootstrapServers = bootstrapServers;
this.topic = topic; this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl; this.serializer = serializer;
} }
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
......
...@@ -31,7 +31,7 @@ public final class TitanKafkaSenderFactory { ...@@ -31,7 +31,7 @@ public final class TitanKafkaSenderFactory {
final String schemaRegistryUrl, final String schemaRegistryUrl,
final Properties properties) { final Properties properties) {
return KafkaRecordSender return KafkaRecordSender
.<ActivePowerRecord>builder( .<ActivePowerRecord>builderWithSchemaRegistry(
bootstrapServers, bootstrapServers,
topic, topic,
schemaRegistryUrl) schemaRegistryUrl)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment