Skip to content
Snippets Groups Projects
Commit 3eac7ae1 authored by Sören Henning's avatar Sören Henning
Browse files

Refactor KafkaRecordSender to prepare for PubSubLite extension

parent ae34b37c
No related branches found
No related tags found
No related merge requests found
Pipeline #6904 passed
...@@ -4,13 +4,10 @@ import java.util.Properties; ...@@ -4,13 +4,10 @@ import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord; import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
...@@ -18,62 +15,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; ...@@ -18,62 +15,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
* *
* @param <T> Record type to send. * @param <T> Record type to send.
*/ */
public class KafkaRecordSender<T> implements RecordSender<T> { public interface KafkaRecordSender<T> extends RecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); public void terminate();
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
/**
* Create a new {@link KafkaRecordSender}.
*/
private KafkaRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.timestampAccessor = builder.timestampAccessor;
final Properties properties = new Properties();
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
builder.serializer);
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
final ProducerRecord<String, T> record = new ProducerRecord<>(
this.topic,
null,
this.timestampAccessor.apply(message),
this.keyAccessor.apply(message),
message);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
/** /**
* Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}. * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}.
...@@ -95,7 +39,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> { ...@@ -95,7 +39,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> {
* *
* @param bootstrapServers The Server to for accessing Kafka. * @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write. * @param topic The topic where to write.
* @param serializer URL to the schema registry for avro. * @param schemaRegistryUrl URL to the schema registry for avro.
*/ */
public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry( public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry(
final String bootstrapServers, final String bootstrapServers,
...@@ -107,7 +51,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> { ...@@ -107,7 +51,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> {
} }
/** /**
* Builder class to build a new {@link KafkaRecordSender}. * Builder class to build a new {@link KafkaRecordSenderImpl}.
* *
* @param <T> Type of the records that should later be send. * @param <T> Type of the records that should later be send.
*/ */
...@@ -142,9 +86,51 @@ public class KafkaRecordSender<T> implements RecordSender<T> { ...@@ -142,9 +86,51 @@ public class KafkaRecordSender<T> implements RecordSender<T> {
return this; return this;
} }
/**
* Create a {@link KafkaRecordSender} from this builder.
*/
public KafkaRecordSender<T> build() { public KafkaRecordSender<T> build() {
return new KafkaRecordSender<>(this); final Properties properties = new Properties();
properties.putAll(this.defaultProperties);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
return new KafkaRecordSenderImpl<>(
new KafkaProducer<>(
properties,
new StringSerializer(),
this.serializer),
new DefaultRecordFactory<>(),
this.topic,
this.keyAccessor,
this.timestampAccessor);
}
private static class DefaultRecordFactory<T> implements KafkaRecordFactory<T, String, T> {
@Override
public ProducerRecord<String, T> create(final String topic, final String key, final T value,
final long timestamp) {
return new ProducerRecord<>(topic, null, timestamp, key, value);
}
} }
} }
/**
* Create Kafka {@link ProducerRecord}s from a topic, a key, a value and a timestamp.
*
* @param <T> type the records should be created from.
* @param <K> key type of the {@link ProducerRecord}s.
* @param <V> value type of the {@link ProducerRecord}s.
*/
public static interface KafkaRecordFactory<T, K, V> {
ProducerRecord<K, V> create(String topic, String key, T value, long timestamp);
}
} }
package rocks.theodolite.benchmarks.loadgenerator;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sends records to Kafka.
*
* @param <T> Record type to send.
* @param <K> Internal key type for Kafka records.
* @param <V> Internal value type for Kafka records.
*/
/* default */ class KafkaRecordSenderImpl<T, K, V> implements KafkaRecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSenderImpl.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<K, V> producer;
private final KafkaRecordFactory<T, K, V> recordFactory;
/**
* Create a new {@link KafkaRecordSenderImpl}.
*/
protected KafkaRecordSenderImpl(
final Producer<K, V> producer,
final KafkaRecordFactory<T, K, V> recordFactory,
final String topic,
final Function<T, String> keyAccessor,
final Function<T, Long> timestampAccessor) {
this.topic = topic;
this.producer = producer;
this.recordFactory = recordFactory;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
}
@Override
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
final ProducerRecord<K, V> record = this.recordFactory.create(
this.topic,
this.keyAccessor.apply(message),
message,
this.timestampAccessor.apply(message));
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment