Skip to content
Snippets Groups Projects
Commit f8427109 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use Avro instead of kieker records in Workload Generator.

Replace the kieker records with the avro records from titan.
Further use the builder pattern in the KafkaRecordSender to enable
creation with different parameters.
parent 67423b5f
No related branches found
No related tags found
1 merge request!28Use Titan CC Avro Records in UC App and Workload Generator
......@@ -78,19 +78,15 @@ configure(useCaseApplications) {
configure(useCaseGenerators) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
// implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
// implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1'
// These dependencies are used for the workload-generator-commmon
implementation project(':workload-generator-commons')
// maintain build of generators
implementation 'net.kieker-monitoring:kieker:1.14'
implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true }
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
......@@ -99,15 +95,10 @@ configure(useCaseGenerators) {
// Dependencies for all commons
configure(commonProjects) {
dependencies {
// These dependencies is exported to consumers, that is to say found on their compile classpath.
api 'org.apache.kafka:kafka-clients:2.4.0'
api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true }
api 'net.kieker-monitoring:kieker:1.14'
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.6.1'
// implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
// implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
......@@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
......@@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> {
public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
......@@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
private KafkaRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.keyAccessor = builder.keyAccessor;
this.timestampAccessor = builder.timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Builder class to build a new {@link KafkaRecordSender}.
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T extends SpecificRecord> {
private final String bootstrapServers;
private final String topic;
private final String schemaRegistryUrl;
private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
public Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl;
}
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
this.keyAccessor = keyAccessor;
return this;
}
public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) {
this.timestampAccessor = timestampAccessor;
return this;
}
public Builder<T> defaultProperties(final Properties defaultProperties) {
this.defaultProperties = defaultProperties;
return this;
}
public KafkaRecordSender<T> build() {
return new KafkaRecordSender<>(this);
}
}
/**
......
package theodolite.commons.workloadgeneration.functions;
import kieker.common.record.IMonitoringRecord;
/**
* This interface describes a function that takes meta information from a string (e.g. an ID) and
* produces an {@link IMonitoringRecord}.
* produces an object of type T.
*
* @param <T> the type of the objects that will be generated by the function.
*/
@FunctionalInterface
public interface MessageGenerator<T extends IMonitoringRecord> {
public interface MessageGenerator<T> {
T generateMessage(final String key);
......
......@@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import kieker.common.record.IMonitoringRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
......@@ -26,33 +25,22 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
public abstract class AbstractWorkloadGenerator<T>
implements WorkloadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
private final int instances;
private final ZooKeeper zooKeeper;
private final KeySpace keySpace;
private final int threads;
private final Duration period;
private final Duration duration;
private final BeforeAction beforeAction;
private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
private final MessageGenerator<T> generatorFunction;
private final Transport<T> transport;
private WorkloadDistributor workloadDistributor;
private final ScheduledExecutorService executor;
/**
......
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import kieker.common.record.IMonitoringRecord;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
......@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
public class KafkaWorkloadGenerator<T extends SpecificRecord>
extends AbstractWorkloadGenerator<T> {
private final KafkaRecordSender<T> recordSender;
......
......@@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.Objects;
import kieker.common.record.IMonitoringRecord;
import org.apache.avro.specific.SpecificRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
......@@ -14,7 +14,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
private int instances;
......@@ -43,7 +43,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
*
* @return the builder.
*/
public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
return new KafkaWorkloadGeneratorBuilder<>();
}
......
package theodolite.commons.workloadgeneration.misc;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
/**
......@@ -8,7 +7,7 @@ import theodolite.commons.workloadgeneration.functions.MessageGenerator;
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class WorkloadEntity<T extends IMonitoringRecord> {
public class WorkloadEntity<T> {
private final String key;
private final MessageGenerator<T> generator;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment