diff --git a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java similarity index 71% rename from uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java rename to application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index bc5fee1f2cb4367284e9db60f575f2652b1bd05b..260dbba9c1f094ac14679b6c7c4637046a687eee 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -1,31 +1,44 @@ -package theodolite.uc4.application; +package theodolite.commons.kafkastreams; /** * Keys to access configuration parameters. */ public final class ConfigurationKeys { - + // Common keys public static final String APPLICATION_NAME = "application.name"; public static final String APPLICATION_VERSION = "application.version"; + public static final String NUM_THREADS = "num.threads"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + // Additional topics public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + // UC2 + public static final String WINDOW_SIZE_MS = "window.size.ms"; - public static final String NUM_THREADS = "num.threads"; + public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + // UC3 + public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + // UC4 + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; private ConfigurationKeys() {} diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ae2a6dafa3d36dada927d17a1ca00d2df63db78b..8c758c24444ea9c590c364063a397f9b7bfec8f9 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; public abstract class KafkaStreamsBuilder { // Kafkastreams application specific + protected String schemaRegistryUrl; // NOPMD for use in subclass + private String applicationName; // NOPMD private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD @@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { return this; } + /** + * Sets the URL for the schema registry. + * + * @param url The URL of the schema registry. + * @return + */ + public KafkaStreamsBuilder schemaRegistry(final String url) { + this.schemaRegistryUrl = url; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { */ public KafkaStreams build() { // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties()); diff --git a/build.gradle b/build.gradle index 99bd2d2c895065792d99ccd084e8957994e36726..9311474c4c23d8c3400768b1f7d2d538fd5597e6 100644 --- a/build.gradle +++ b/build.gradle @@ -12,9 +12,10 @@ buildscript { // Variables used to distinct different subprojects def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} +def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')} +def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')} def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} - // Plugins allprojects { apply plugin: 'eclipse' @@ -51,22 +52,38 @@ allprojects { maven { url "https://oss.sonatype.org/content/repositories/snapshots/" } + maven { + url 'https://packages.confluent.io/maven/' + } } } -// Dependencies for all use cases -configure(useCaseProjects) { +// Dependencies for all use case applications +configure(useCaseApplications) { dependencies { - // These dependencies are exported to consumers, that is to say found on their compile classpath. - api('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - implementation 'org.apache.kafka:kafka-clients:2.1.0' + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' implementation project(':application-kafkastreams-commons') - + + // Use JUnit test framework + testImplementation 'junit:junit:4.12' + } +} + +// Dependencies for all use case generators +configure(useCaseGenerators) { + dependencies { + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'com.google.guava:guava:24.1-jre' + implementation 'org.jctools:jctools-core:2.1.1' + implementation 'org.slf4j:slf4j-simple:1.6.1' + // These dependencies are used for the workload-generator-commmon implementation project(':workload-generator-commons') @@ -78,14 +95,10 @@ configure(useCaseProjects) { // Dependencies for all commons configure(commonProjects) { dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - api 'org.apache.kafka:kafka-clients:2.4.0' - api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } - api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/execution/uc1-application/aggregation-deployment.yaml b/execution/uc1-application/aggregation-deployment.yaml index 67e8cbc6b87680afc43d1578891ed28a026066a1..bcb0a955de0d5ce64fe6bdcba1e537468c833e5b 100644 --- a/execution/uc1-application/aggregation-deployment.yaml +++ b/execution/uc1-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS @@ -50,4 +52,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc1-workload-generator/deployment.yaml b/execution/uc1-workload-generator/deployment.yaml index d2091aa723f47a475204a0e68f944dcdc67266ba..63ade4391e0505c6e86acdc6442f238292b03250 100644 --- a/execution/uc1-workload-generator/deployment.yaml +++ b/execution/uc1-workload-generator/deployment.yaml @@ -24,6 +24,8 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - name: POD_NAME @@ -32,4 +34,3 @@ spec: fieldPath: metadata.name - name: INSTANCES value: "{{INSTANCES}}" - \ No newline at end of file diff --git a/execution/uc2-application/aggregation-deployment.yaml b/execution/uc2-application/aggregation-deployment.yaml index 701ea0b9028a6c2d5d1cc338c40e28732d7ec9d9..199966a31d0ccac1f5bb8e3b1c0e17e1cae1f8c9 100644 --- a/execution/uc2-application/aggregation-deployment.yaml +++ b/execution/uc2-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS @@ -50,4 +52,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc2-workload-generator/deployment.yaml b/execution/uc2-workload-generator/deployment.yaml index 4d3459ddab6e9547ca575202ecea33af2eb81e3a..6a3d0cfe7184f6e7807210316272d3290536e760 100644 --- a/execution/uc2-workload-generator/deployment.yaml +++ b/execution/uc2-workload-generator/deployment.yaml @@ -23,10 +23,11 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: HIERARCHY value: "full" - name: NUM_SENSORS value: "4" - name: NUM_NESTED_GROUPS value: "{{NUM_NESTED_GROUPS}}" - \ No newline at end of file diff --git a/execution/uc3-application/aggregation-deployment.yaml b/execution/uc3-application/aggregation-deployment.yaml index 872fc513571f4a0280ce0ce3dc7c26ed875dc2fe..a535b5b6443e89564d4bb0cbe17593c60dc289dc 100644 --- a/execution/uc3-application/aggregation-deployment.yaml +++ b/execution/uc3-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: KAFKA_WINDOW_DURATION_MINUTES value: "1" - name: COMMIT_INTERVAL_MS @@ -52,4 +54,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc3-workload-generator/deployment.yaml b/execution/uc3-workload-generator/deployment.yaml index b546e54018ebaa93d5a826cb35c4b5f3dd0cd088..0dc01297276846c10e2bbf5f0d076a54b7716c86 100644 --- a/execution/uc3-workload-generator/deployment.yaml +++ b/execution/uc3-workload-generator/deployment.yaml @@ -24,6 +24,8 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - name: POD_NAME @@ -32,4 +34,3 @@ spec: fieldPath: metadata.name - name: INSTANCES value: "{{INSTANCES}}" - \ No newline at end of file diff --git a/execution/uc4-application/aggregation-deployment.yaml b/execution/uc4-application/aggregation-deployment.yaml index 729899446d9c89232fa29f8562ec33dd9292a414..5f71737046e12b7f0116d59c4b55f0c0de39bbd2 100644 --- a/execution/uc4-application/aggregation-deployment.yaml +++ b/execution/uc4-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: AGGREGATION_DURATION_DAYS value: "3" #AGGREGATION_DURATION_DAYS - name: AGGREGATION_DURATION_ADVANCE @@ -54,4 +56,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc4-workload-generator/deployment.yaml b/execution/uc4-workload-generator/deployment.yaml index 7176f5a1008a80eabeb59bf204d1a2e532e96ff7..312cbfc259a5faa85690aa4d4870562460c90e7d 100644 --- a/execution/uc4-workload-generator/deployment.yaml +++ b/execution/uc4-workload-generator/deployment.yaml @@ -23,6 +23,7 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - \ No newline at end of file diff --git a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java deleted file mode 100644 index ee4113c3088629fe01988721e32d9704f5d30da5..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ /dev/null @@ -1,25 +0,0 @@ -package theodolite.uc1.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - - private ConfigurationKeys() {} - -} diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index b551fb7f8ff74f5ddc7e3aad901c1412075c6da6..a35cc37b36fb906e5c5495006126374d4de4656c 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -3,8 +3,9 @@ package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -40,6 +41,7 @@ public class HistoryService { .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 824a8dadd4d80dd29d09b21543fa6da6aedf5365..1c30e0c2c83b3d8a2f3dca4df0c7aec99cc4f450 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -18,14 +18,19 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); + /** * Create a new {@link TopologyBuilder} using the given topics. */ - public TopologyBuilder(final String inputTopic) { + public TopologyBuilder(final String inputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; } /** @@ -35,7 +40,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 4af3f130373d0596232921b9c5cc0b48df573b72..7699ecb48369a2041777b901931c46072a10d99f 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { @Override protected Topology buildTopology() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - return new TopologyBuilder(this.inputTopic).build(); + return new TopologyBuilder(this.inputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); } } diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index 9dcbb9a64be111c2ea1db006081b983c9007b140..3fb301516daa4c7e14875d3d9ca9df9c770eb69e 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index 381aef34dd42cc3cac9908480719a98fc55f3a27..a7b27dfdb25760f0b96c930c9705c2eed0402442 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Load Generator for UC1. @@ -41,11 +41,14 @@ public final class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -60,25 +63,29 @@ public final class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties); + + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index 06a6d9ccbf6750290335cd7389391eb613b1569a..a193fe134311e656f1010c738675210689e1b9d6 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -4,8 +4,9 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class AggregationService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -49,6 +50,7 @@ public class AggregationService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java index 0555df96c153065ecf9be2bf2ead10de60d55cbf..724c7f6e2eaebc7be53f03b89d143d885c4a055c 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java @@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java index b78eec51e1cd9e717f79b075e5e27230af56dbe7..cf4362a21ebd0e7b3bb9c4cad4ca871d0b3f2ea8 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java @@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Factory class configuration required by {@link JointFlatTransformerFactory}. diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java index 71505ad30b49eaa975ad461412b438ed7ccfc8d0..cba05f1ed8e585d5c31aaa92207e0d2854436736 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java @@ -2,7 +2,7 @@ package theodolite.uc2.streamprocessing; import java.util.Objects; import java.util.Set; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * A joined pair of an {@link ActivePowerRecord} and its associated parents. Both the record and the diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java index 10fb98c9c575bde508a7e24c9e825b25475eff76..9564e994da8fc909147bec76097c737f14247868 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java @@ -1,8 +1,8 @@ package theodolite.uc2.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; /** * Updates an {@link AggregatedActivePowerRecord} by a new {@link ActivePowerRecord}. @@ -19,7 +19,7 @@ public class RecordAggregator { final double average = count == 0 ? 0.0 : sum / count; return new AggregatedActivePowerRecord( identifier.key(), record.getTimestamp(), - 0.0, 0.0, count, sum, average); + count, sum, average); } /** @@ -32,8 +32,7 @@ public class RecordAggregator { final double average = count == 0 ? 0.0 : sum / count; return new AggregatedActivePowerRecord( // TODO timestamp -1 indicates that this record is emitted by an substract event - identifier.key(), -1, - 0.0, 0.0, count, sum, average); + identifier.key(), -1L, count, sum, average); } } diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index b6c46fa3a1822cbf1a11e3a8399aa7a061283952..b2dfae12a0bd207b490086d8ca0767d5a6b9cb1d 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -18,40 +18,47 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { - // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private static final int LATENCY_OUTPOUT_THRESHOLD = 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; private final String outputTopic; private final String configurationTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration windowSize; private final Duration gracePeriod; private final StreamsBuilder builder = new StreamsBuilder(); private final RecordAggregator recordAggregator = new RecordAggregator(); + private StatsAccumulator latencyStats = new StatsAccumulator(); + private long lastTime = System.currentTimeMillis(); /** * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final String configurationTopic, final Duration windowSize, final Duration gracePeriod) { + final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, + final Duration windowSize, final Duration gracePeriod) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.configurationTopic = configurationTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.windowSize = windowSize; this.gracePeriod = gracePeriod; } @@ -84,11 +91,11 @@ public class TopologyBuilder { final KStream<String, ActivePowerRecord> values = this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); final KStream<String, ActivePowerRecord> aggregationsInput = this.builder .stream(this.outputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); final KTable<String, ActivePowerRecord> inputTable = values @@ -96,9 +103,9 @@ public class TopologyBuilder { .mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(), v.getValueInW())) .groupByKey(Grouped.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .reduce((aggr, value) -> value, Materialized.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); return inputTable; } @@ -140,13 +147,13 @@ public class TopologyBuilder { jointFlatMapTransformerFactory.getStoreName()) .groupByKey(Grouped.with( SensorParentKeySerde.serde(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod)) .reduce( // TODO Configurable window aggregation function (aggValue, newValue) -> newValue, Materialized.with(SensorParentKeySerde.serde(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); } @@ -159,14 +166,14 @@ public class TopologyBuilder { new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), this.windowSize.toMillis()), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .aggregate( () -> null, this.recordAggregator::add, this.recordAggregator::substract, Materialized.with( new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), this.windowSize.toMillis()), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded())) // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) .toStream() @@ -175,36 +182,35 @@ public class TopologyBuilder { .map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp } - private StatsAccumulator latencyStats = new StatsAccumulator(); - private long lastTime = System.currentTimeMillis(); - private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { aggregations .peek((k, v) -> { final long time = System.currentTimeMillis(); final long latency = time - v.getTimestamp(); this.latencyStats.add(latency); - if (time - this.lastTime >= 1000) { - System.out.println("latency," - + time + ',' - + this.latencyStats.mean() + ',' - + (this.latencyStats.count() > 0 - ? this.latencyStats.populationStandardDeviation() - : Double.NaN) - + ',' - + (this.latencyStats.count() > 1 - ? this.latencyStats.sampleStandardDeviation() - : Double.NaN) - + ',' - + this.latencyStats.min() + ',' - + this.latencyStats.max() + ',' - + this.latencyStats.count()); + if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("latency," + + time + ',' + + this.latencyStats.mean() + ',' + + (this.latencyStats.count() > 0 + ? this.latencyStats.populationStandardDeviation() + : Double.NaN) + + ',' + + (this.latencyStats.count() > 1 + ? this.latencyStats.sampleStandardDeviation() + : Double.NaN) + + ',' + + this.latencyStats.min() + ',' + + this.latencyStats.max() + ',' + + this.latencyStats.count()); + } this.latencyStats = new StatsAccumulator(); this.lastTime = time; } }) .to(this.outputTopic, Produced.with( Serdes.String(), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); } } diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index ce7d5e90b476a9d8b8508ea2356f4a2da1d856f3..2f3e5c7e994a3d194810016c4664a5a83c4cc21b 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -54,6 +55,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build this.inputTopic, this.outputTopic, this.configurationTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties index f9a5225680f638239e637e99bf8d65152d15764d..74f47163d0fa02d1e3b582aab53bc8907a7855af 100644 --- a/uc2-application/src/main/resources/META-INF/application.properties +++ b/uc2-application/src/main/resources/META-INF/application.properties @@ -8,8 +8,12 @@ configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + +schema.registry.url=http://localhost:8091 + window.size.ms=1000 window.grace.ms=0 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java index 49ed674bc4442f01de1cf51e4510f2079524933d..54e8c460e642d53bb013ef6888570d6fc36ff614 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java @@ -3,7 +3,6 @@ package theodolite.uc2.streamprocessing; import java.util.Optional; import java.util.Set; import org.junit.Test; -import theodolite.uc2.streamprocessing.OptionalParentsSerde; public class OptionalParentsSerdeTest { diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java index 15872798698ceffcdbaddb689d4179afd7d67a01..f12604d6a19ca36e9c151210005c910b37908307 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java @@ -2,7 +2,6 @@ package theodolite.uc2.streamprocessing; import java.util.Set; import org.junit.Test; -import theodolite.uc2.streamprocessing.ParentsSerde; public class ParentsSerdeTest { diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java index 7d9fe3a6eb83b82d85913f212fe9a930f194b220..7ca99bcb79baeb5f95a8270b99a559f2f108867e 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java @@ -1,8 +1,6 @@ package theodolite.uc2.streamprocessing; import org.junit.Test; -import theodolite.uc2.streamprocessing.SensorParentKey; -import theodolite.uc2.streamprocessing.SensorParentKeySerde; public class SensorParentKeySerdeTest { diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java index c8b3a1846254603c8690bf395c24c6d6f9fb2166..ad24e8e4bc8f86b7ed4d5dc2822622f8da22d6d1 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java @@ -10,8 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; +/** + * Class to publish a configuration to Kafka. + * + */ public class ConfigPublisher { + private static final String MEMORY_CONFIG = "134217728"; // 128 MB + private final String topic; private final Producer<Event, String> producer; @@ -20,6 +26,13 @@ public class ConfigPublisher { this(bootstrapServers, topic, new Properties()); } + /** + * Creates a new {@link ConfigPublisher} object. + * + * @param bootstrapServers Zoo Keeper server. + * @param topic where to write the configuration. + * @param defaultProperties default properties. + */ public ConfigPublisher(final String bootstrapServers, final String topic, final Properties defaultProperties) { this.topic = topic; @@ -27,13 +40,19 @@ public class ConfigPublisher { final Properties properties = new Properties(); properties.putAll(defaultProperties); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB + properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, MEMORY_CONFIG); + properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, MEMORY_CONFIG); this.producer = new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); } + /** + * Publish an event with given value to the kafka topic. + * + * @param event Which {@link Event} happened. + * @param value Configuration value. + */ public void publish(final Event event, final String value) { final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); try { diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 6cf7d81af21545b288c9bc24177575e6966b95de..a33fba0ea5688a2673b193d45a57693da56b1db4 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -14,22 +14,41 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.configuration.events.Event; +import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { + + private static final int SLEEP_PERIOD = 30_000; private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // Constants + private static final String DEEP = "deep"; private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC2."); // get environment variables - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); + final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), DEEP); final int numNestedGroups = Integer .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); @@ -39,13 +58,16 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final boolean sendRegistry = Boolean .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -64,19 +86,26 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setBeforeAction(() -> { + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .beforeAction(() -> { if (sendRegistry) { final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); @@ -86,18 +115,18 @@ public class LoadGenerator { LOGGER.info("Now wait 30 seconds"); try { - Thread.sleep(30_000); + Thread.sleep(SLEEP_PERIOD); } catch (final InterruptedException e) { // TODO Auto-generated catch block - e.printStackTrace(); + LOGGER.error(e.getMessage(), e); } LOGGER.info("And woke up again :)"); } }) - .setGeneratorFunction( + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start @@ -109,7 +138,7 @@ public class LoadGenerator { final int numNestedGroups, final int numSensors) { final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { + if (DEEP.equals(hierarchy)) { MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); for (int lvl = 1; lvl < numNestedGroups; lvl++) { lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); @@ -117,7 +146,7 @@ public class LoadGenerator { for (int s = 0; s < numSensors; s++) { lastSensor.addChildMachineSensor("sensor_" + s); } - } else if (hierarchy.equals("full")) { + } else if ("full".equals(hierarchy)) { addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0); } else { throw new IllegalStateException(); @@ -126,8 +155,8 @@ public class LoadGenerator { } private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, - final int maxLvl, int nextId) { + final int lvl, final int maxLvl, final int startId) { + int nextId = startId; for (int c = 0; c < numChildren; c++) { if (lvl == maxLvl) { parent.addChildMachineSensor("s_" + nextId); diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 18aae8c3499643c29901c3ca7461ec707d59c280..b245b1645c9e5ee68df3f108802c9b91d70cf017 100644 --- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -5,8 +5,9 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -15,7 +16,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final int windowDurationMinutes = Integer @@ -45,6 +46,7 @@ public class HistoryService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index 0ad1845f656bcbd11b61c0e0affa9b6bcfabd2f7..74eed74c52a78df229c02542bc6e66d7f796c2c7 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -14,8 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.uc3.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -26,6 +26,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration duration; private final StreamsBuilder builder = new StreamsBuilder(); @@ -34,9 +35,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration duration) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.duration = duration; } @@ -47,7 +50,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.of(this.duration)) // .aggregate( @@ -62,7 +65,7 @@ public class TopologyBuilder { GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> System.out.println(k + ": " + v)) + .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); return this.builder.build(); diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 63841361b06bb054fee203a894fba0c11c249d16..e74adf7c87673cc0e6ea4004dbcb1c0a6fc907ac 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -36,7 +37,7 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(); } diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties index 96e2d8b6ff46f3b3ce878b1fec011e9315e118bc..2ceaf37224b0bff54b09beaabe29210216e11671 100644 --- a/uc3-application/src/main/resources/META-INF/application.properties +++ b/uc3-application/src/main/resources/META-INF/application.properties @@ -4,6 +4,10 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +kafka.window.duration.minutes=1 + +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 80e3810f9d9a8e872d44f794e7a3f29ce8a3b2e0..85f6a94036c53b48973ba2200212fc8e5dfd663d 100644 --- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -13,14 +13,30 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // constants private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC3."); @@ -33,11 +49,14 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -53,21 +72,27 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index 3e3073fdeed682ae09e345d9f315585e960a3440..23af805733de2bb3f6384fa924a2322490ee58d9 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -4,8 +4,9 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -45,6 +46,7 @@ public class HistoryService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index b4632aaf15ee5f2572c795458f4bfded5c8cfbcd..a92abae6e11c4bf66a5d8d8dee0f10b088e8274b 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import theodolite.uc4.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -32,6 +32,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration aggregtionDuration; private final Duration aggregationAdvance; @@ -41,9 +42,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.aggregtionDuration = aggregtionDuration; this.aggregationAdvance = aggregationAdvance; } @@ -58,14 +61,14 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return keyFactory.createKey(value.getIdentifier(), dateTime); }) .groupByKey( - Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 8220f4cd36b0639cd69ac102177a53b1ed90e5b6..7c9e2c4f790cf1fbb7dd34db573576d1e64077db 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.aggregtionDuration, this.aggregationAdvance); diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 84df87a6a7a55b3b001db8037ca156d9b28fd39c..ff551e7ef423633137d122dfed7d6e03d362e7ff 100644 --- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -13,14 +13,30 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.misc.ZooKeeper; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // constants private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc4 LOGGER.info("Start workload generator for use case UC4."); @@ -33,11 +49,14 @@ public class LoadGenerator { Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final double value = + Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -52,22 +71,29 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = - new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + new KafkaRecordSender.Builder<ActivePowerRecord>( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .defaultProperties(kafkaProperties) + .build(); // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java index c420658b7131ca5cc5d24e7d1b5d5a8069414cca..33818b51084ce33a564d6f30cefb26b481d0a859 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka; import java.util.Properties; import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.functions.Transport; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { +public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -29,40 +29,74 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport private final Producer<String, T> producer; - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - /** * Create a new {@link KafkaRecordSender}. */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; + private KafkaRecordSender(final Builder<T> builder) { + this.topic = builder.topic; + this.keyAccessor = builder.keyAccessor; + this.timestampAccessor = builder.timestampAccessor; final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); + properties.putAll(builder.defaultProperties); + properties.put("bootstrap.servers", builder.bootstrapServers); // properties.put("acks", this.acknowledges); // properties.put("batch.size", this.batchSize); // properties.put("linger.ms", this.lingerMs); // properties.put("buffer.memory", this.bufferMemory); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); + avroSerdeFactory.<T>forKeys().serializer()); + } + + /** + * Builder class to build a new {@link KafkaRecordSender}. + * + * @param <T> Type of the records that should later be send. + */ + public static class Builder<T extends SpecificRecord> { + + private final String bootstrapServers; + private final String topic; + private final String schemaRegistryUrl; + private Function<T, String> keyAccessor = x -> ""; // NOPMD + private Function<T, Long> timestampAccessor = x -> null; // NOPMD + private Properties defaultProperties = new Properties(); // NOPMD + + /** + * Creates a Builder object for a {@link KafkaRecordSender}. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. + */ + public Builder(final String bootstrapServers, final String topic, + final String schemaRegistryUrl) { + this.bootstrapServers = bootstrapServers; + this.topic = topic; + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { + this.keyAccessor = keyAccessor; + return this; + } + + public Builder<T> timestampAccessor(final Function<T, Long> timestampAccessor) { + this.timestampAccessor = timestampAccessor; + return this; + } + + public Builder<T> defaultProperties(final Properties defaultProperties) { + this.defaultProperties = defaultProperties; + return this; + } + + public KafkaRecordSender<T> build() { + return new KafkaRecordSender<>(this); + } } /** diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java index 6ad61ae9ced4eda35b6828677efc267cb56aaf19..2249abcbcb1071cf880b2ee80f5d41f2b3dab463 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java @@ -44,10 +44,10 @@ public class WorkloadDistributor { private final BiConsumer<WorkloadDefinition, Integer> workerAction; private final int instances; - private final ZooKeeper zooKeeper; + private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable private final CuratorFramework client; - private boolean workloadGenerationStarted = false; + private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false /** * Create a new workload distributor. @@ -79,8 +79,8 @@ public class WorkloadDistributor { try { this.client.blockUntilConnected(); } catch (final InterruptedException e) { - LOGGER.error("", e); - throw new IllegalStateException(); + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException(e); } this.counter = @@ -142,9 +142,9 @@ public class WorkloadDistributor { if (!this.workloadGenerationStarted) { LOGGER.warn("No workload definition retrieved for 20 s. Terminating now.."); } - } catch (final Exception e) { - LOGGER.error("", e); - throw new IllegalStateException("Error when starting the distribution of the workload."); + } catch (final Exception e) { // NOPMD need to catch exception because of external framework + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException("Error when starting the distribution of the workload.", e); } } @@ -154,7 +154,9 @@ public class WorkloadDistributor { * @param workerId the ID of this worker * @throws Exception when an error occurs */ - private synchronized void startWorkloadGeneration(final int workerId) throws Exception { + // NOPMD because exception thrown from used framework + private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD + if (!this.workloadGenerationStarted) { this.workloadGenerationStarted = true; @@ -181,9 +183,9 @@ public class WorkloadDistributor { if (event.getType() == EventType.NodeChildrenChanged) { try { WorkloadDistributor.this.startWorkloadGeneration(workerId); - } catch (final Exception e) { - LOGGER.error("", e); - throw new IllegalStateException("Error starting workload generation."); + } catch (final Exception e) { // NOPMD external framework throws exception + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException("Error starting workload generation.", e); } } } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java index 8c59079ddabafa4fb1de398b7d58503362fa721e..672b579ebbdf3cbb08f3d05d9511c9077f9dac6b 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java @@ -1,15 +1,13 @@ package theodolite.commons.workloadgeneration.functions; -import kieker.common.record.IMonitoringRecord; - /** * This interface describes a function that takes meta information from a string (e.g. an ID) and - * produces an {@link IMonitoringRecord}. + * produces an object of type T. * * @param <T> the type of the objects that will be generated by the function. */ @FunctionalInterface -public interface MessageGenerator<T extends IMonitoringRecord> { +public interface MessageGenerator<T> { T generateMessage(final String key); diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java index 7c95e24f2b97d6259ec8c1bb4c75a356ef477287..7e5100a4e99f13a98156311a9d892c9626b2318a 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java @@ -1,15 +1,13 @@ package theodolite.commons.workloadgeneration.functions; -import kieker.common.record.IMonitoringRecord; - /** - * This interface describes a function that consumes a {@link IMonitoringRecord}. This function is - * dedicated to be used to transport individual messages to the messaging system. + * This interface describes a function that consumes a message {@code T}. This function is dedicated + * to be used to transport individual messages to the messaging system. * * @param <T> the type of records to send as messages. */ @FunctionalInterface -public interface Transport<T extends IMonitoringRecord> { +public interface Transport<T> { void transport(final T message); diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 889075bf81df22847f93bfcfaeb00d762fe62dad..82bb5a951087cd9958a2e33247174e10fe4f3335 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import kieker.common.record.IMonitoringRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; @@ -26,33 +25,19 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> +public abstract class AbstractWorkloadGenerator<T> implements WorkloadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); - private final int instances; - - private final ZooKeeper zooKeeper; - - private final KeySpace keySpace; - - private final int threads; - - private final Duration period; - - private final Duration duration; - - private final BeforeAction beforeAction; - + private final int instances; // NOPMD keep instance variable instead of local variable + private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable + private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable + private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; - private final MessageGenerator<T> generatorFunction; - private final Transport<T> transport; - - private WorkloadDistributor workloadDistributor; - + private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local private final ScheduledExecutorService executor; /** @@ -80,10 +65,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> final Transport<T> transport) { this.instances = instances; this.zooKeeper = zooKeeper; - this.period = period; - this.threads = threads; this.keySpace = keySpace; - this.duration = duration; this.beforeAction = beforeAction; this.generatorFunction = generatorFunction; this.workloadSelector = (workloadDefinition, workerId) -> { @@ -105,7 +87,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> final int periodMs = (int) period.toMillis(); - LOGGER.info("Period: " + periodMs); + LOGGER.info("Period: " + periodMs); // NOPMD no computational intensive logger call final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> { @@ -132,7 +114,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> this.stop(); } catch (final InterruptedException e) { LOGGER.error("", e); - throw new IllegalStateException("Error when terminating the workload generation."); + throw new IllegalStateException("Error when terminating the workload generation.", e); } }; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 9cce7e56390a2f5076e4030b25b1697db2179dae..944cec6a2dffed886f06fad1e36c9d35375fe15c 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -1,7 +1,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> The type of records the workload generator is dedicated for. */ -public class KafkaWorkloadGenerator<T extends IMonitoringRecord> +public class KafkaWorkloadGenerator<T extends SpecificRecord> extends AbstractWorkloadGenerator<T> { private final KafkaRecordSender<T> recordSender; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index 4ece341bcb1f8e7c3394f8d30e19dae9e166e01f..785087c13480b7149a5726dfce8bbf4307b57933 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.generators; import java.time.Duration; import java.util.Objects; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -14,25 +14,17 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> the record for which the builder is dedicated for. */ -public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { - - private int instances; - - private ZooKeeper zooKeeper; - - private KeySpace keySpace; - - private int threads; - - private Duration period; - - private Duration duration; - - private BeforeAction beforeAction; - - private MessageGenerator<T> generatorFunction; - - private KafkaRecordSender<T> kafkaRecordSender; +public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD + + private int instances; // NOPMD + private ZooKeeper zooKeeper; // NOPMD + private KeySpace keySpace; // NOPMD + private int threads; // NOPMD + private Duration period; // NOPMD + private Duration duration; // NOPMD + private BeforeAction beforeAction; // NOPMD + private MessageGenerator<T> generatorFunction; // NOPMD + private KafkaRecordSender<T> kafkaRecordSender; // NOPMD private KafkaWorkloadGeneratorBuilder() { @@ -43,7 +35,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * * @return the builder. */ - public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() { + public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() { return new KafkaWorkloadGeneratorBuilder<>(); } @@ -53,7 +45,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param instances the number of instances. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) { + public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) { this.instances = instances; return this; } @@ -64,7 +56,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param zooKeeper a reference to the ZooKeeper instance. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) { + public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; return this; } @@ -75,7 +67,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param beforeAction the {@link BeforeAction}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) { + public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) { this.beforeAction = beforeAction; return this; } @@ -86,7 +78,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param keySpace the {@link KeySpace}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) { + public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) { this.keySpace = keySpace; return this; } @@ -97,7 +89,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param threads the number of threads. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) { + public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) { this.threads = threads; return this; } @@ -108,7 +100,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param period the {@link Period} * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Duration period) { + public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) { this.period = period; return this; } @@ -119,7 +111,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param duration the {@link Duration}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) { + public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) { this.duration = duration; return this; } @@ -130,7 +122,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param generatorFunction the generator function. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction( + public KafkaWorkloadGeneratorBuilder<T> generatorFunction( final MessageGenerator<T> generatorFunction) { this.generatorFunction = generatorFunction; return this; @@ -142,7 +134,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @param kafkaRecordSender the record sender to use. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender( + public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender( final KafkaRecordSender<T> kafkaRecordSender) { this.kafkaRecordSender = kafkaRecordSender; return this; @@ -163,9 +155,14 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @return the built instance of the {@link KafkaWorkloadGenerator}. */ public KafkaWorkloadGenerator<T> build() { - Objects.requireNonNull(this.instances, "Please specify the number of instances."); + if (this.instances < 1) { // NOPMD + throw new IllegalArgumentException( + "Please specify a valid number of instances. Currently: " + this.instances); + } Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance."); - this.threads = Objects.requireNonNullElse(this.threads, 1); + if (this.threads < 1) { // NOPMD + this.threads = 1; + } Objects.requireNonNull(this.keySpace, "Please specify the key space."); Objects.requireNonNull(this.period, "Please specify the period."); Objects.requireNonNull(this.duration, "Please specify the duration."); diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java index 1e3c55257f7454a836e774f76019a261868c5a0a..d8665b3fb53e7d15ed61780e3b91fbfe56f709ba 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java @@ -1,6 +1,5 @@ package theodolite.commons.workloadgeneration.misc; -import kieker.common.record.IMonitoringRecord; import theodolite.commons.workloadgeneration.functions.MessageGenerator; /** @@ -8,7 +7,7 @@ import theodolite.commons.workloadgeneration.functions.MessageGenerator; * * @param <T> The type of records the workload generator is dedicated for. */ -public class WorkloadEntity<T extends IMonitoringRecord> { +public class WorkloadEntity<T> { private final String key; private final MessageGenerator<T> generator;