diff --git a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java similarity index 64% rename from uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java rename to application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index bc5fee1f2cb4367284e9db60f575f2652b1bd05b..61fb135f095c6cb2fa1df6decd5a1765fee0cacf 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -1,9 +1,19 @@ -package theodolite.uc4.application; +package theodolite.commons.kafkastreams; /** * Keys to access configuration parameters. */ public final class ConfigurationKeys { + // Common keys + public static final String APPLICATION_NAME = "application.name"; + + public static final String APPLICATION_VERSION = "application.version"; + + public static final String NUM_THREADS = "num.threads"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; public static final String APPLICATION_NAME = "application.name"; @@ -11,21 +21,28 @@ public final class ConfigurationKeys { public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + // Additional topics public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + // UC2 + public static final String WINDOW_SIZE_MS = "window.size.ms"; - public static final String NUM_THREADS = "num.threads"; + public static final String WINDOW_GRACE_MS = "window.grace.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + // UC3 + public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + // UC4 + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; private ConfigurationKeys() {} diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index ae2a6dafa3d36dada927d17a1ca00d2df63db78b..8c758c24444ea9c590c364063a397f9b7bfec8f9 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; public abstract class KafkaStreamsBuilder { // Kafkastreams application specific + protected String schemaRegistryUrl; // NOPMD for use in subclass + private String applicationName; // NOPMD private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD @@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { return this; } + /** + * Sets the URL for the schema registry. + * + * @param url The URL of the schema registry. + * @return + */ + public KafkaStreamsBuilder schemaRegistry(final String url) { + this.schemaRegistryUrl = url; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { */ public KafkaStreams build() { // Check for required attributes for building properties. - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties()); diff --git a/build.gradle b/build.gradle index 99bd2d2c895065792d99ccd084e8957994e36726..1caf1b61688949ac5fa44660759d92430aa3d1a2 100644 --- a/build.gradle +++ b/build.gradle @@ -12,9 +12,10 @@ buildscript { // Variables used to distinct different subprojects def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} +def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')} +def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')} def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} - // Plugins allprojects { apply plugin: 'eclipse' @@ -51,22 +52,23 @@ allprojects { maven { url "https://oss.sonatype.org/content/repositories/snapshots/" } + maven { + url 'https://packages.confluent.io/maven/' + } } } -// Dependencies for all use cases -configure(useCaseProjects) { +// Dependencies for all use case applications +configure(useCaseApplications) { dependencies { - // These dependencies are exported to consumers, that is to say found on their compile classpath. - api('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } - // These dependencies are used internally, and not exposed to consumers on their own compile classpath. - implementation 'org.apache.kafka:kafka-clients:2.1.0' + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' implementation project(':application-kafkastreams-commons') - + // These dependencies are used for the workload-generator-commmon implementation project(':workload-generator-commons') @@ -75,6 +77,25 @@ configure(useCaseProjects) { } } +// Dependencies for all use case generators +configure(useCaseGenerators) { + dependencies { + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. + // implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'com.google.guava:guava:24.1-jre' + implementation 'org.jctools:jctools-core:2.1.1' + implementation 'org.slf4j:slf4j-simple:1.6.1' + + // maintain build of generators + implementation 'net.kieker-monitoring:kieker:1.14' + implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } + + // Use JUnit test framework + testImplementation 'junit:junit:4.12' + } +} + // Dependencies for all commons configure(commonProjects) { dependencies { @@ -85,7 +106,8 @@ configure(commonProjects) { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/execution/uc1-application/aggregation-deployment.yaml b/execution/uc1-application/aggregation-deployment.yaml index d5bccca4a72f6a47a855ed8a7ca47fac4a8a19ca..519004df32e7853d47b00d44a63652a93363b14c 100644 --- a/execution/uc1-application/aggregation-deployment.yaml +++ b/execution/uc1-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS @@ -50,4 +52,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc1-workload-generator/deployment.yaml b/execution/uc1-workload-generator/deployment.yaml index f79d763068f3f70401acd11263b598415ed7781d..0a8b947bd3e901c379dd2ff18f8825c5cedae2eb 100644 --- a/execution/uc1-workload-generator/deployment.yaml +++ b/execution/uc1-workload-generator/deployment.yaml @@ -16,7 +16,7 @@ spec: terminationGracePeriodSeconds: 0 containers: - name: workload-generator - image: soerenhenning/uc1-wg:latest + image: soerenhenning/uc1-wg:latest env: - name: ZK_HOST value: "my-confluent-cp-zookeeper" @@ -24,6 +24,8 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - name: POD_NAME @@ -32,4 +34,3 @@ spec: fieldPath: metadata.name - name: INSTANCES value: "{{INSTANCES}}" - \ No newline at end of file diff --git a/execution/uc2-application/aggregation-deployment.yaml b/execution/uc2-application/aggregation-deployment.yaml index ce52421731ea5fc044c435ad10adb311e7e7e878..4804cc7604537b47ea5084b15665c96cf6284ccb 100644 --- a/execution/uc2-application/aggregation-deployment.yaml +++ b/execution/uc2-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: COMMIT_INTERVAL_MS value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" - name: JAVA_OPTS @@ -50,4 +52,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc2-workload-generator/deployment.yaml b/execution/uc2-workload-generator/deployment.yaml index 416802004dcb69833765e59a19ca8337110e666a..8c3b86e0d9497652885a580ce066c13078388c7a 100644 --- a/execution/uc2-workload-generator/deployment.yaml +++ b/execution/uc2-workload-generator/deployment.yaml @@ -15,7 +15,7 @@ spec: terminationGracePeriodSeconds: 0 containers: - name: workload-generator - image: benediktwetzel/uc2-wg:latest + image: benediktwetzel/uc2-wg:latest env: - name: ZK_HOST value: "my-confluent-cp-zookeeper" @@ -23,10 +23,11 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: HIERARCHY value: "full" - name: NUM_SENSORS value: "4" - name: NUM_NESTED_GROUPS value: "{{NUM_NESTED_GROUPS}}" - \ No newline at end of file diff --git a/execution/uc3-application/aggregation-deployment.yaml b/execution/uc3-application/aggregation-deployment.yaml index 0f3327af3119df125e3431574e3e406183abc132..f6f789381d98362c5287dea85e03a6b4f9856bb6 100644 --- a/execution/uc3-application/aggregation-deployment.yaml +++ b/execution/uc3-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: KAFKA_WINDOW_DURATION_MINUTES value: "1" - name: COMMIT_INTERVAL_MS @@ -52,4 +54,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc3-workload-generator/deployment.yaml b/execution/uc3-workload-generator/deployment.yaml index 2171c31a81d64765f65d5b76a2137d1151f63859..615a437886be7b0153bc6cb950ac3daa3ddce3fa 100644 --- a/execution/uc3-workload-generator/deployment.yaml +++ b/execution/uc3-workload-generator/deployment.yaml @@ -16,7 +16,7 @@ spec: terminationGracePeriodSeconds: 0 containers: - name: workload-generator - image: soerenhenning/uc3-wg:latest + image: soerenhenning/uc3-wg:latest env: - name: ZK_HOST value: "my-confluent-cp-zookeeper" @@ -24,6 +24,8 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - name: POD_NAME @@ -32,4 +34,3 @@ spec: fieldPath: metadata.name - name: INSTANCES value: "{{INSTANCES}}" - \ No newline at end of file diff --git a/execution/uc4-application/aggregation-deployment.yaml b/execution/uc4-application/aggregation-deployment.yaml index f7a750c790b6a9eab8453fa91e05176de665104e..390c17230d0b092534e9c600772e3af04611fb02 100644 --- a/execution/uc4-application/aggregation-deployment.yaml +++ b/execution/uc4-application/aggregation-deployment.yaml @@ -22,6 +22,8 @@ spec: env: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: AGGREGATION_DURATION_DAYS value: "3" #AGGREGATION_DURATION_DAYS - name: AGGREGATION_DURATION_ADVANCE @@ -54,4 +56,4 @@ spec: volumes: - name: jmx-config configMap: - name: aggregation-jmx-configmap \ No newline at end of file + name: aggregation-jmx-configmap diff --git a/execution/uc4-workload-generator/deployment.yaml b/execution/uc4-workload-generator/deployment.yaml index 516051b2a574a8307f20f551e2be87e753f21658..c6a84f56f3733cb4e8c1e0aa46255c391b290dd0 100644 --- a/execution/uc4-workload-generator/deployment.yaml +++ b/execution/uc4-workload-generator/deployment.yaml @@ -15,7 +15,7 @@ spec: terminationGracePeriodSeconds: 0 containers: - name: workload-generator - image: soerenhenning/uc4-wg:latest + image: soerenhenning/uc4-wg:latest env: - name: ZK_HOST value: "my-confluent-cp-zookeeper" @@ -23,6 +23,7 @@ spec: value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://my-confluent-cp-schema-registry:8081" - name: NUM_SENSORS value: "{{NUM_SENSORS}}" - \ No newline at end of file diff --git a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java deleted file mode 100644 index ee4113c3088629fe01988721e32d9704f5d30da5..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java +++ /dev/null @@ -1,25 +0,0 @@ -package theodolite.uc1.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - - private ConfigurationKeys() {} - -} diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index b551fb7f8ff74f5ddc7e3aad901c1412075c6da6..a35cc37b36fb906e5c5495006126374d4de4656c 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -3,8 +3,9 @@ package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -40,6 +41,7 @@ public class HistoryService { .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 824a8dadd4d80dd29d09b21543fa6da6aedf5365..1c30e0c2c83b3d8a2f3dca4df0c7aec99cc4f450 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -18,14 +18,19 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); + /** * Create a new {@link TopologyBuilder} using the given topics. */ - public TopologyBuilder(final String inputTopic) { + public TopologyBuilder(final String inputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { this.inputTopic = inputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; } /** @@ -35,7 +40,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .mapValues(v -> this.gson.toJson(v)) .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 4af3f130373d0596232921b9c5cc0b48df573b72..7699ecb48369a2041777b901931c46072a10d99f 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { @Override protected Topology buildTopology() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - return new TopologyBuilder(this.inputTopic).build(); + return new TopologyBuilder(this.inputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); } } diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index 9dcbb9a64be111c2ea1db006081b983c9007b140..3fb301516daa4c7e14875d3d9ca9df9c770eb69e 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index 06a6d9ccbf6750290335cd7389391eb613b1569a..a193fe134311e656f1010c738675210689e1b9d6 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -4,8 +4,9 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class AggregationService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -49,6 +50,7 @@ public class AggregationService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java index 0555df96c153065ecf9be2bf2ead10de60d55cbf..724c7f6e2eaebc7be53f03b89d143d885c4a055c 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java @@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java index b78eec51e1cd9e717f79b075e5e27230af56dbe7..cf4362a21ebd0e7b3bb9c4cad4ca871d0b3f2ea8 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java @@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * Factory class configuration required by {@link JointFlatTransformerFactory}. diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java index 02b7318587a77228e7fb2f7dc1b3350bac532c89..fc8c077b0414dc2b4079639bb34c5d966e4de0d3 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java @@ -1,7 +1,7 @@ package theodolite.uc2.streamprocessing; import java.util.Set; -import titan.ccp.models.records.ActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; /** * A joined pair of an {@link ActivePowerRecord} and its associated parents. Both the record and the diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java index 10fb98c9c575bde508a7e24c9e825b25475eff76..9564e994da8fc909147bec76097c737f14247868 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java @@ -1,8 +1,8 @@ package theodolite.uc2.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; /** * Updates an {@link AggregatedActivePowerRecord} by a new {@link ActivePowerRecord}. @@ -19,7 +19,7 @@ public class RecordAggregator { final double average = count == 0 ? 0.0 : sum / count; return new AggregatedActivePowerRecord( identifier.key(), record.getTimestamp(), - 0.0, 0.0, count, sum, average); + count, sum, average); } /** @@ -32,8 +32,7 @@ public class RecordAggregator { final double average = count == 0 ? 0.0 : sum / count; return new AggregatedActivePowerRecord( // TODO timestamp -1 indicates that this record is emitted by an substract event - identifier.key(), -1, - 0.0, 0.0, count, sum, average); + identifier.key(), -1L, count, sum, average); } } diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index b6c46fa3a1822cbf1a11e3a8399aa7a061283952..2249d066a1af173266e55f83cd384b60c82e5b3b 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -18,40 +18,44 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; -import titan.ccp.models.records.ActivePowerRecordFactory; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; /** * Builds Kafka Stream Topology for the History microservice. */ public class TopologyBuilder { + private static final int LATENCY_OUTPOUT_THRESHOLD = 1000; // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; private final String outputTopic; private final String configurationTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration windowSize; private final Duration gracePeriod; private final StreamsBuilder builder = new StreamsBuilder(); private final RecordAggregator recordAggregator = new RecordAggregator(); + private StatsAccumulator latencyStats = new StatsAccumulator(); + private long lastTime = System.currentTimeMillis(); /** * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final String configurationTopic, final Duration windowSize, final Duration gracePeriod) { + final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, + final Duration windowSize, final Duration gracePeriod) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.configurationTopic = configurationTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.windowSize = windowSize; this.gracePeriod = gracePeriod; } @@ -84,11 +88,11 @@ public class TopologyBuilder { final KStream<String, ActivePowerRecord> values = this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); final KStream<String, ActivePowerRecord> aggregationsInput = this.builder .stream(this.outputTopic, Consumed.with( Serdes.String(), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); final KTable<String, ActivePowerRecord> inputTable = values @@ -96,9 +100,9 @@ public class TopologyBuilder { .mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(), v.getValueInW())) .groupByKey(Grouped.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .reduce((aggr, value) -> value, Materialized.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); return inputTable; } @@ -140,13 +144,13 @@ public class TopologyBuilder { jointFlatMapTransformerFactory.getStoreName()) .groupByKey(Grouped.with( SensorParentKeySerde.serde(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod)) .reduce( // TODO Configurable window aggregation function (aggValue, newValue) -> newValue, Materialized.with(SensorParentKeySerde.serde(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); } @@ -159,14 +163,14 @@ public class TopologyBuilder { new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), this.windowSize.toMillis()), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .aggregate( () -> null, this.recordAggregator::add, this.recordAggregator::substract, Materialized.with( new WindowedSerdes.TimeWindowedSerde<>( Serdes.String(), this.windowSize.toMillis()), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))) + this.srAvroSerdeFactory.forValues())) .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded())) // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) .toStream() @@ -175,16 +179,13 @@ public class TopologyBuilder { .map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp } - private StatsAccumulator latencyStats = new StatsAccumulator(); - private long lastTime = System.currentTimeMillis(); - private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { aggregations .peek((k, v) -> { final long time = System.currentTimeMillis(); final long latency = time - v.getTimestamp(); this.latencyStats.add(latency); - if (time - this.lastTime >= 1000) { + if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { System.out.println("latency," + time + ',' + this.latencyStats.mean() + ',' @@ -205,6 +206,6 @@ public class TopologyBuilder { }) .to(this.outputTopic, Produced.with( Serdes.String(), - IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory()))); + this.srAvroSerdeFactory.forValues())); } } diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index ce7d5e90b476a9d8b8508ea2356f4a2da1d856f3..2f3e5c7e994a3d194810016c4664a5a83c4cc21b 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -54,6 +55,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build this.inputTopic, this.outputTopic, this.configurationTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties index f9a5225680f638239e637e99bf8d65152d15764d..74f47163d0fa02d1e3b582aab53bc8907a7855af 100644 --- a/uc2-application/src/main/resources/META-INF/application.properties +++ b/uc2-application/src/main/resources/META-INF/application.properties @@ -8,8 +8,12 @@ configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + +schema.registry.url=http://localhost:8091 + window.size.ms=1000 window.grace.ms=0 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 18aae8c3499643c29901c3ca7461ec707d59c280..b245b1645c9e5ee68df3f108802c9b91d70cf017 100644 --- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -5,8 +5,9 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -15,7 +16,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final int windowDurationMinutes = Integer @@ -45,6 +46,7 @@ public class HistoryService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index 0ad1845f656bcbd11b61c0e0affa9b6bcfabd2f7..74eed74c52a78df229c02542bc6e66d7f796c2c7 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -14,8 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.uc3.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -26,6 +26,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration duration; private final StreamsBuilder builder = new StreamsBuilder(); @@ -34,9 +35,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration duration) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.duration = duration; } @@ -47,7 +50,7 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.of(this.duration)) // .aggregate( @@ -62,7 +65,7 @@ public class TopologyBuilder { GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> System.out.println(k + ": " + v)) + .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); return this.builder.build(); diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index 63841361b06bb054fee203a894fba0c11c249d16..e74adf7c87673cc0e6ea4004dbcb1c0a6fc907ac 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -36,7 +37,7 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); return topologyBuilder.build(); } diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties index 96e2d8b6ff46f3b3ce878b1fec011e9315e118bc..2ceaf37224b0bff54b09beaabe29210216e11671 100644 --- a/uc3-application/src/main/resources/META-INF/application.properties +++ b/uc3-application/src/main/resources/META-INF/application.properties @@ -4,6 +4,10 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +kafka.window.duration.minutes=1 + +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index 3e3073fdeed682ae09e345d9f315585e960a3440..23af805733de2bb3f6384fa924a2322490ee58d9 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -4,8 +4,9 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; +import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -45,6 +46,7 @@ public class HistoryService { .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index b4632aaf15ee5f2572c795458f4bfded5c8cfbcd..a92abae6e11c4bf66a5d8d8dee0f10b088e8274b 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import theodolite.uc4.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -32,6 +32,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration aggregtionDuration; private final Duration aggregationAdvance; @@ -41,9 +42,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.aggregtionDuration = aggregtionDuration; this.aggregationAdvance = aggregationAdvance; } @@ -58,14 +61,14 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return keyFactory.createKey(value.getIdentifier(), dateTime); }) .groupByKey( - Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 8220f4cd36b0639cd69ac102177a53b1ed90e5b6..7c9e2c4f790cf1fbb7dd34db573576d1e64077db 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.aggregtionDuration, this.aggregationAdvance);