From b78c13269f8a1ef4d2bb5a3828f32838c20cbb28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Sat, 30 May 2020 10:59:47 +0200 Subject: [PATCH] Use avro records instead of kieker in uc4-application Replace the kieker records with the avro records in uc4 application. --- .../theodolite/uc4/application/HistoryService.java | 5 +++-- .../uc4/streamprocessing/TopologyBuilder.java | 11 +++++++---- .../uc4/streamprocessing/Uc4KafkaStreamsBuilder.java | 2 ++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index a27ea6711..b119857c9 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -6,7 +6,7 @@ import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -15,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -44,6 +44,7 @@ public class HistoryService { // Configuration of the stream application final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index b4632aaf1..a92abae6e 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import theodolite.uc4.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -32,6 +32,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration aggregtionDuration; private final Duration aggregationAdvance; @@ -41,9 +42,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.aggregtionDuration = aggregtionDuration; this.aggregationAdvance = aggregationAdvance; } @@ -58,14 +61,14 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return keyFactory.createKey(value.getIdentifier(), dateTime); }) .groupByKey( - Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 8220f4cd3..7c9e2c4f7 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.aggregtionDuration, this.aggregationAdvance); -- GitLab