diff --git a/build.gradle b/build.gradle index 797d1039b2964e7fbe34a809126b937c25a9ecf9..9311474c4c23d8c3400768b1f7d2d538fd5597e6 100644 --- a/build.gradle +++ b/build.gradle @@ -78,19 +78,15 @@ configure(useCaseApplications) { configure(useCaseGenerators) { dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } - // implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' - + // These dependencies are used for the workload-generator-commmon implementation project(':workload-generator-commons') - // maintain build of generators - implementation 'net.kieker-monitoring:kieker:1.14' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } - // Use JUnit test framework testImplementation 'junit:junit:4.12' } @@ -99,15 +95,10 @@ configure(useCaseGenerators) { // Dependencies for all commons configure(commonProjects) { dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - api 'org.apache.kafka:kafka-clients:2.4.0' - api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } - api 'net.kieker-monitoring:kieker:1.14' - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } - // implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java index c420658b7131ca5cc5d24e7d1b5d5a8069414cca..33818b51084ce33a564d6f30cefb26b481d0a859 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka; import java.util.Properties; import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.functions.Transport; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { +public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport private final Producer<String, T> producer; - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - /** * Create a new {@link KafkaRecordSender}. */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; + private KafkaRecordSender(final Builder<T> builder) { + this.topic = builder.topic; + this.keyAccessor = builder.keyAccessor; + this.timestampAccessor = builder.timestampAccessor; final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); + properties.putAll(builder.defaultProperties); + properties.put("bootstrap.servers", builder.bootstrapServers); // properties.put("acks", this.acknowledges); // properties.put("batch.size", this.batchSize); // properties.put("linger.ms", this.lingerMs); // properties.put("buffer.memory", this.bufferMemory); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); + avroSerdeFactory.<T>forKeys().serializer()); + } + + /** + * Builder class to build a new {@link KafkaRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T extends SpecificRecord> { + + private final String bootstrapServers; + private final String topic; + private final String schemaRegistryUrl; + private Function<T, String> keyAccessor = x -> ""; // NOPMD + private Function<T, Long> timestampAccessor = x -> null; // NOPMD + private Properties defaultProperties = new Properties(); // NOPMD + + /** + * Creates a Builder object for a {@link KafkaRecordSender}. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. + */ + public Builder(final String bootstrapServers, final String topic, + final String schemaRegistryUrl) { + this.bootstrapServers = bootstrapServers; + this.topic = topic; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { + this.keyAccessor = keyAccessor; + return this; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> defaultProperties(final Properties defaultProperties) { + this.defaultProperties = defaultProperties; + return this; + } + + public KafkaRecordSender<T> build() { + return new KafkaRecordSender<>(this); + } } /** diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java index 8c59079ddabafa4fb1de398b7d58503362fa721e..672b579ebbdf3cbb08f3d05d9511c9077f9dac6b 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java @@ -1,15 +1,13 @@ package theodolite.commons.workloadgeneration.functions; -import kieker.common.record.IMonitoringRecord; - /** * This interface describes a function that takes meta information from a string (e.g. an ID) and - * produces an {@link IMonitoringRecord}. + * produces an object of type T. * * @param <T> the type of the objects that will be generated by the function. */ @FunctionalInterface -public interface MessageGenerator<T extends IMonitoringRecord> { +public interface MessageGenerator<T> { T generateMessage(final String key); diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 889075bf81df22847f93bfcfaeb00d762fe62dad..8b03c224ea132015225e54dfcb493b836920807e 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import kieker.common.record.IMonitoringRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; @@ -26,33 +25,22 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> +public abstract class AbstractWorkloadGenerator<T> implements WorkloadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); private final int instances; - private final ZooKeeper zooKeeper; - private final KeySpace keySpace; - private final int threads; - private final Duration period; - private final Duration duration; - private final BeforeAction beforeAction; - private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; - private final MessageGenerator<T> generatorFunction; - private final Transport<T> transport; - private WorkloadDistributor workloadDistributor; - private final ScheduledExecutorService executor; /** diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 9cce7e56390a2f5076e4030b25b1697db2179dae..944cec6a2dffed886f06fad1e36c9d35375fe15c 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -1,7 +1,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public class KafkaWorkloadGenerator<T extends IMonitoringRecord> +public class KafkaWorkloadGenerator<T extends SpecificRecord> extends AbstractWorkloadGenerator<T> { private final KafkaRecordSender<T> recordSender; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index 4ece341bcb1f8e7c3394f8d30e19dae9e166e01f..246e83041a268806a1b61dca44f916da096d5c74 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; import java.util.Objects; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -14,7 +14,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> the record for which the builder is dedicated for. */ -public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { +public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { private int instances; @@ -43,7 +43,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * * @return the builder. */ - public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() { + public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() { return new KafkaWorkloadGeneratorBuilder<>(); } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java index 1e3c55257f7454a836e774f76019a261868c5a0a..d8665b3fb53e7d15ed61780e3b91fbfe56f709ba 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java @@ -1,6 +1,5 @@ package theodolite.commons.workloadgeneration.misc; -import kieker.common.record.IMonitoringRecord; import theodolite.commons.workloadgeneration.functions.MessageGenerator; /** @@ -8,7 +7,7 @@ import theodolite.commons.workloadgeneration.functions.MessageGenerator; * * @param <T> The type of records the workload generator is dedicated for. */ -public class WorkloadEntity<T extends IMonitoringRecord> { +public class WorkloadEntity<T> { private final String key; private final MessageGenerator<T> generator;