diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java index 45731e8bce79264252c55a61b9efa5245f610c3b..948672f5ccbc9b3d72c31f8c90042fd0a13f271b 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java @@ -7,17 +7,18 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** - * Sends monitoring records to Kafka. + * Sends records to Kafka. * - * @param <T> {@link SpecificRecord} to send + * @param <T> Record type to send. */ -public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { +public class KafkaRecordSender<T> implements RecordSender<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -45,21 +46,24 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender // properties.put("linger.ms", this.lingerMs); // properties.put("buffer.memory", this.bufferMemory); - final SchemaRegistryAvroSerdeFactory avroSerdeFactory = - new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); this.producer = new KafkaProducer<>( properties, new StringSerializer(), - avroSerdeFactory.<T>forKeys().serializer()); + builder.serializer); } - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); + public void terminate() { + this.producer.close(); + } + + @Override + public void send(final T message) { + final ProducerRecord<String, T> record = new ProducerRecord<>( + this.topic, + null, + this.timestampAccessor.apply(message), + this.keyAccessor.apply(message), + message); LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); try { @@ -71,20 +75,35 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender } } - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - this.write(message); + /** + * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}. + * + * @param bootstrapServers The server to for accessing Kafka. + * @param topic The topic where to write. + * @param serializer The {@link Serializer} for mapping a value to keys. + */ + public static <T> Builder<T> builderWithSerializer( + final String bootstrapServers, + final String topic, + final Serializer<T> serializer) { + return new Builder<>(bootstrapServers, topic, serializer); } - public static <T extends SpecificRecord> Builder<T> builder( + /** + * Creates a Builder object for a {@link KafkaRecordSender} based on a Confluent Schema Registry + * URL. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param serializer URL to the schema registry for avro. + */ + public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry( final String bootstrapServers, final String topic, final String schemaRegistryUrl) { - return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + return new Builder<>(bootstrapServers, topic, avroSerdeFactory.<T>forValues().serializer()); } /** @@ -92,27 +111,20 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender * * @param <T> Type of the records that should later be send. */ - public static class Builder<T extends SpecificRecord> { + public static class Builder<T> { private final String bootstrapServers; private final String topic; - private final String schemaRegistryUrl; + private final Serializer<T> serializer; private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD private Properties defaultProperties = new Properties(); // NOPMD - /** - * Creates a Builder object for a {@link KafkaRecordSender}. - * - * @param bootstrapServers The Server to for accessing Kafka. - * @param topic The topic where to write. - * @param schemaRegistryUrl URL to the schema registry for avro. - */ private Builder(final String bootstrapServers, final String topic, - final String schemaRegistryUrl) { + final Serializer<T> serializer) { this.bootstrapServers = bootstrapServers; this.topic = topic; - this.schemaRegistryUrl = schemaRegistryUrl; + this.serializer = serializer; } public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java index 063bbaaab4a24d9dd2d90ef744672e03ac852b8b..ee7d416513439a5d0ba7bad7bcdb09e1baf5e4c7 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java @@ -31,7 +31,7 @@ public final class TitanKafkaSenderFactory { final String schemaRegistryUrl, final Properties properties) { return KafkaRecordSender - .<ActivePowerRecord>builder( + .<ActivePowerRecord>builderWithSchemaRegistry( bootstrapServers, topic, schemaRegistryUrl)