From f84271098143f96369527cb730ee636c4023c2c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Sun, 26 Jul 2020 21:21:08 +0200 Subject: [PATCH] Use Avro instead of kieker records in Workload Generator. Replace the kieker records with the avro records from titan. Further use the builder pattern in the KafkaRecordSender to enable creation with different parameters. --- build.gradle | 19 ++-- .../kafka/KafkaRecordSender.java | 86 +++++++++++++------ .../functions/MessageGenerator.java | 6 +- .../generators/AbstractWorkloadGenerator.java | 14 +-- .../generators/KafkaWorkloadGenerator.java | 4 +- .../KafkaWorkloadGeneratorBuilder.java | 6 +- .../misc/WorkloadEntity.java | 3 +- 7 files changed, 74 insertions(+), 64 deletions(-) diff --git a/build.gradle b/build.gradle index 797d1039b..9311474c4 100644 --- a/build.gradle +++ b/build.gradle @@ -78,19 +78,15 @@ configure(useCaseApplications) { configure(useCaseGenerators) { dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } - // implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' - + // These dependencies are used for the workload-generator-commmon implementation project(':workload-generator-commons') - // maintain build of generators - implementation 'net.kieker-monitoring:kieker:1.14' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } - // Use JUnit test framework testImplementation 'junit:junit:4.12' } @@ -99,15 +95,10 @@ configure(useCaseGenerators) { // Dependencies for all commons configure(commonProjects) { dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - api 'org.apache.kafka:kafka-clients:2.4.0' - api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } - api 'net.kieker-monitoring:kieker:1.14' - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } - // implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java index c420658b7..33818b510 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka; import java.util.Properties; import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.functions.Transport; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { +public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport private final Producer<String, T> producer; - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - /** * Create a new {@link KafkaRecordSender}. */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; + private KafkaRecordSender(final Builder<T> builder) { + this.topic = builder.topic; + this.keyAccessor = builder.keyAccessor; + this.timestampAccessor = builder.timestampAccessor; final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); + properties.putAll(builder.defaultProperties); + properties.put("bootstrap.servers", builder.bootstrapServers); // properties.put("acks", this.acknowledges); // properties.put("batch.size", this.batchSize); // properties.put("linger.ms", this.lingerMs); // properties.put("buffer.memory", this.bufferMemory); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); + avroSerdeFactory.<T>forKeys().serializer()); + } + + /** + * Builder class to build a new {@link KafkaRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T extends SpecificRecord> { + + private final String bootstrapServers; + private final String topic; + private final String schemaRegistryUrl; + private Function<T, String> keyAccessor = x -> ""; // NOPMD + private Function<T, Long> timestampAccessor = x -> null; // NOPMD + private Properties defaultProperties = new Properties(); // NOPMD + + /** + * Creates a Builder object for a {@link KafkaRecordSender}. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. + */ + public Builder(final String bootstrapServers, final String topic, + final String schemaRegistryUrl) { + this.bootstrapServers = bootstrapServers; + this.topic = topic; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { + this.keyAccessor = keyAccessor; + return this; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> defaultProperties(final Properties defaultProperties) { + this.defaultProperties = defaultProperties; + return this; + } + + public KafkaRecordSender<T> build() { + return new KafkaRecordSender<>(this); + } } /** diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java index 8c59079dd..672b579eb 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java @@ -1,15 +1,13 @@ package theodolite.commons.workloadgeneration.functions; -import kieker.common.record.IMonitoringRecord; - /** * This interface describes a function that takes meta information from a string (e.g. an ID) and - * produces an {@link IMonitoringRecord}. + * produces an object of type T. * * @param <T> the type of the objects that will be generated by the function. */ @FunctionalInterface -public interface MessageGenerator<T extends IMonitoringRecord> { +public interface MessageGenerator<T> { T generateMessage(final String key); diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 889075bf8..8b03c224e 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import kieker.common.record.IMonitoringRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; @@ -26,33 +25,22 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> +public abstract class AbstractWorkloadGenerator<T> implements WorkloadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); private final int instances; - private final ZooKeeper zooKeeper; - private final KeySpace keySpace; - private final int threads; - private final Duration period; - private final Duration duration; - private final BeforeAction beforeAction; - private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; - private final MessageGenerator<T> generatorFunction; - private final Transport<T> transport; - private WorkloadDistributor workloadDistributor; - private final ScheduledExecutorService executor; /** diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 9cce7e563..944cec6a2 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -1,7 +1,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public class KafkaWorkloadGenerator<T extends IMonitoringRecord> +public class KafkaWorkloadGenerator<T extends SpecificRecord> extends AbstractWorkloadGenerator<T> { private final KafkaRecordSender<T> recordSender; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index 4ece341bc..246e83041 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; import java.util.Objects; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -14,7 +14,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> the record for which the builder is dedicated for. */ -public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { +public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { private int instances; @@ -43,7 +43,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * * @return the builder. */ - public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() { + public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() { return new KafkaWorkloadGeneratorBuilder<>(); } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java index 1e3c55257..d8665b3fb 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java @@ -1,6 +1,5 @@ package theodolite.commons.workloadgeneration.misc; -import kieker.common.record.IMonitoringRecord; import theodolite.commons.workloadgeneration.functions.MessageGenerator; /** @@ -8,7 +7,7 @@ import theodolite.commons.workloadgeneration.functions.MessageGenerator; * * @param <T> The type of records the workload generator is dedicated for. */ -public class WorkloadEntity<T extends IMonitoringRecord> { +public class WorkloadEntity<T> { private final String key; private final MessageGenerator<T> generator; -- GitLab