Skip to content
Snippets Groups Projects

Use Titan CC Avro Records in UC App and Workload Generator

Merged Björn Vonheiden requested to merge stu202077/theodolite:feature/app-wg-with-avro into master
7 files
+ 74
64
Compare changes
  • Side-by-side
  • Inline
Files
7
@@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> {
public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
@@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
private KafkaRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.timestampAccessor = builder.timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Builder class to build a new {@link KafkaRecordSender}.
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T extends SpecificRecord> {
private final String bootstrapServers;
private final String topic;
private final String schemaRegistryUrl;
private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
public Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl;
}
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
this.keyAccessor = keyAccessor;
return this;
}
public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) {
this.timestampAccessor = timestampAccessor;
return this;
}
public Builder<T> defaultProperties(final Properties defaultProperties) {
this.defaultProperties = defaultProperties;
return this;
}
public KafkaRecordSender<T> build() {
return new KafkaRecordSender<>(this);
}
}
/**
Loading