diff --git a/build.gradle b/build.gradle index 694a127ca58774bbe8c243e74996e412488adbf0..5c6dab67f541e4df1037334052680214b9f99c50 100644 --- a/build.gradle +++ b/build.gradle @@ -12,9 +12,10 @@ buildscript { // Variables used to distinct different subprojects def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} +def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')} +def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')} def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} - // Plugins allprojects { apply plugin: 'eclipse' @@ -51,19 +52,18 @@ allprojects { maven { url "https://oss.sonatype.org/content/repositories/snapshots/" } + maven { + url 'https://packages.confluent.io/maven/' + } } } -// Dependencies for all use cases -configure(useCaseProjects) { +// Dependencies for all use case applications +configure(useCaseApplications) { dependencies { - // These dependencies is exported to consumers, that is to say found on their compile classpath. - api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } - api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' - api 'net.sourceforge.teetime:teetime:3.0' - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - implementation 'org.apache.kafka:kafka-clients:2.1.0' + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' @@ -74,6 +74,25 @@ configure(useCaseProjects) { } } +// Dependencies for all use case generators +configure(useCaseGenerators) { + dependencies { + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. + // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'com.google.guava:guava:24.1-jre' + implementation 'org.jctools:jctools-core:2.1.1' + implementation 'org.slf4j:slf4j-simple:1.6.1' + + // maintain build of generators + implementation 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' + implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } + + // Use JUnit test framework + testImplementation 'junit:junit:4.12' + } +} + // Dependencies for all commons configure(commonProjects) { dependencies { @@ -82,7 +101,8 @@ configure(commonProjects) { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/execution/uc1-application/aggregation-deployment.yaml b/execution/uc1-application/aggregation-deployment.yaml index d5bccca4a72f6a47a855ed8a7ca47fac4a8a19ca..519004df32e7853d47b00d44a63652a93363b14c 100644 --- a/execution/uc1-application/aggregation-deployment.yaml +++ b/execution/uc1-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS @@ -50,4 +52,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc1-workload-generator/deployment.yaml b/execution/uc1-workload-generator/deployment.yaml index a0fde4bbf9765b2bb56bd36acde430d97169f34b..97a9600564d29eb0ff4f5240922fbf25a6406da6 100644 --- a/execution/uc1-workload-generator/deployment.yaml +++ b/execution/uc1-workload-generator/deployment.yaml @@ -16,10 +16,12 @@ spec: terminationGracePeriodSeconds: 0 containers: - name: workload-generator - image: soerenhenning/uc1-wg:latest + image: soerenhenning/uc1-wg:latest env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - name: POD_NAME @@ -28,4 +30,3 @@ spec: fieldPath: metadata.name - name: INSTANCES value: "{{INSTANCES}}" - \ No newline at end of file diff --git a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java index ee4113c3088629fe01988721e32d9704f5d30da5..ec2e023e1af8641daed738d371b4497ccabbfb84 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java @@ -17,6 +17,8 @@ public final class ConfigurationKeys { public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index b551fb7f8ff74f5ddc7e3aad901c1412075c6da6..8f54267e8e726e9717479e2e415e69051089901e 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -4,7 +4,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -13,7 +13,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -40,6 +40,7 @@ public class HistoryService { .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 824a8dadd4d80dd29d09b21543fa6da6aedf5365..6d2e4fc054c2ab1c38a1377c5170925cd2e8cd98 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -18,14 +18,19 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); + /** * Create a new {@link TopologyBuilder} using the given topics. */ - public TopologyBuilder(final String inputTopic) { + public TopologyBuilder(final String inputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; } /** @@ -35,7 +40,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forKeys())) .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 4af3f130373d0596232921b9c5cc0b48df573b72..7699ecb48369a2041777b901931c46072a10d99f 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { @Override protected Topology buildTopology() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - return new TopologyBuilder(this.inputTopic).build(); + return new TopologyBuilder(this.inputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); } } diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index 8f029be66f9decadc87c8e88f58698d1422d596d..1fcfb010f3de11615f764cbbc8de01128982d370 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java index bf562d86ac913138f48da79c4542d9583b1c8390..f9c50f96339c27d70783213c4d3f8c218ad939c8 100644 --- a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ b/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java @@ -2,14 +2,14 @@ package theodolite.kafkasender; import java.util.Properties; import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** @@ -17,7 +17,7 @@ import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> { +public class KafkaRecordSender<T extends SpecificRecord> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -29,24 +29,30 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { private final Producer<String, T> producer; - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); + public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl, + final String topic) { + this(bootstrapServers, schemaRegistryUrl, topic, x -> "", x -> null, new Properties()); } - public KafkaRecordSender(final String bootstrapServers, final String topic, + public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl, + final String topic, final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); + this(bootstrapServers, schemaRegistryUrl, topic, keyAccessor, x -> null, new Properties()); } - public KafkaRecordSender(final String bootstrapServers, final String topic, + public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl, + final String topic, final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); + this(bootstrapServers, schemaRegistryUrl, topic, keyAccessor, timestampAccessor, + new Properties()); } /** * Create a new {@link KafkaRecordSender}. */ - public KafkaRecordSender(final String bootstrapServers, final String topic, + public KafkaRecordSender(final String bootstrapServers, + final String schemaRegistryUrl, + final String topic, final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, final Properties defaultProperties) { this.topic = topic; @@ -61,8 +67,12 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { // properties.put("linger.ms", this.lingerMs); // properties.put("buffer.memory", this.bufferMemory); - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + + this.producer = new KafkaProducer<>(properties, + new StringSerializer(), + srAvroSerdeFactory.<T>forKeys().serializer()); } /** diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index bcff74b9a5a4efc72ce1f206f5f10c13557eafd7..515289302eb591ba50538d9c99106e75b21147b6 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -15,7 +15,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.kafkasender.KafkaRecordSender; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord.Builder; public class LoadGenerator { @@ -35,6 +36,8 @@ public class LoadGenerator { final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); + final String schemaRegistryUrl = + Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091"); final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); @@ -55,6 +58,7 @@ public class LoadGenerator { kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( kafkaBootstrapServers, + schemaRegistryUrl, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), @@ -63,10 +67,14 @@ public class LoadGenerator { final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); final Random random = new Random(); + final Builder aprBuilder = ActivePowerRecord.newBuilder(); + for (final String sensor : sensors) { final int initialDelay = random.nextInt(periodMs); executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + aprBuilder.setIdentifier(sensor).setTimestamp(System.currentTimeMillis()) + .setValueInW(value); + kafkaRecordSender.write(aprBuilder.build()); }, initialDelay, periodMs, TimeUnit.MILLISECONDS); }