diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java index 948672f5ccbc9b3d72c31f8c90042fd0a13f271b..3f3b05c9bbf4c361f25f856a711265dc267c0365 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java @@ -4,13 +4,10 @@ import java.util.Properties; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** @@ -18,62 +15,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; * * @param <T> Record type to send. */ -public class KafkaRecordSender<T> implements RecordSender<T> { +public interface KafkaRecordSender<T> extends RecordSender<T> { - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - /** - * Create a new {@link KafkaRecordSender}. - */ - private KafkaRecordSender(final Builder<T> builder) { - this.topic = builder.topic; - this.keyAccessor = builder.keyAccessor; - this.timestampAccessor = builder.timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(builder.defaultProperties); - properties.put("bootstrap.servers", builder.bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>( - properties, - new StringSerializer(), - builder.serializer); - } - - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - final ProducerRecord<String, T> record = new ProducerRecord<>( - this.topic, - null, - this.timestampAccessor.apply(message), - this.keyAccessor.apply(message), - message); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } - } + public void terminate(); /** * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}. @@ -95,7 +39,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> { * * @param bootstrapServers The Server to for accessing Kafka. * @param topic The topic where to write. - * @param serializer URL to the schema registry for avro. + * @param schemaRegistryUrl URL to the schema registry for avro. */ public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry( final String bootstrapServers, @@ -107,7 +51,7 @@ public class KafkaRecordSender<T> implements RecordSender<T> { } /** - * Builder class to build a new {@link KafkaRecordSender}. + * Builder class to build a new {@link KafkaRecordSenderImpl}. * * @param <T> Type of the records that should later be send. */ @@ -142,9 +86,51 @@ public class KafkaRecordSender<T> implements RecordSender<T> { return this; } + /** + * Create a {@link KafkaRecordSender} from this builder. + */ public KafkaRecordSender<T> build() { - return new KafkaRecordSender<>(this); + final Properties properties = new Properties(); + properties.putAll(this.defaultProperties); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + return new KafkaRecordSenderImpl<>( + new KafkaProducer<>( + properties, + new StringSerializer(), + this.serializer), + new DefaultRecordFactory<>(), + this.topic, + this.keyAccessor, + this.timestampAccessor); + } + + private static class DefaultRecordFactory<T> implements KafkaRecordFactory<T, String, T> { + + @Override + public ProducerRecord<String, T> create(final String topic, final String key, final T value, + final long timestamp) { + return new ProducerRecord<>(topic, null, timestamp, key, value); + } + } } + /** + * Create Kafka {@link ProducerRecord}s from a topic, a key, a value and a timestamp. + * + * @param <T> type the records should be created from. + * @param <K> key type of the {@link ProducerRecord}s. + * @param <V> value type of the {@link ProducerRecord}s. + */ + public static interface KafkaRecordFactory<T, K, V> { + + ProducerRecord<K, V> create(String topic, String key, T value, long timestamp); + + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..289651b417aa715f8f528bb6547121bcfcc3155d --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java @@ -0,0 +1,70 @@ +package rocks.theodolite.benchmarks.loadgenerator; + +import java.util.function.Function; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends records to Kafka. + * + * @param <T> Record type to send. + * @param <K> Internal key type for Kafka records. + * @param <V> Internal value type for Kafka records. + */ +/* default */ class KafkaRecordSenderImpl<T, K, V> implements KafkaRecordSender<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSenderImpl.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<K, V> producer; + + private final KafkaRecordFactory<T, K, V> recordFactory; + + /** + * Create a new {@link KafkaRecordSenderImpl}. + */ + protected KafkaRecordSenderImpl( + final Producer<K, V> producer, + final KafkaRecordFactory<T, K, V> recordFactory, + final String topic, + final Function<T, String> keyAccessor, + final Function<T, Long> timestampAccessor) { + this.topic = topic; + this.producer = producer; + this.recordFactory = recordFactory; + this.keyAccessor = keyAccessor; + this.timestampAccessor = timestampAccessor; + } + + @Override + public void terminate() { + this.producer.close(); + } + + @Override + public void send(final T message) { + final ProducerRecord<K, V> record = this.recordFactory.create( + this.topic, + this.keyAccessor.apply(message), + message, + this.timestampAccessor.apply(message)); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + +}