diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index a27ea6711a5ee734d4b925bde49cbba8837137f2..b119857c99a01a35032b6abbdd7da2dd653d5c9b 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -6,7 +6,7 @@ import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; +import titan.ccp.common.configuration.ServiceConfigurations; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -15,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; */ public class HistoryService { - private final Configuration config = Configurations.create(); + private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); @@ -44,6 +44,7 @@ public class HistoryService { // Configuration of the stream application final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index b4632aaf15ee5f2572c795458f4bfded5c8cfbcd..a92abae6e11c4bf66a5d8d8dee0f10b088e8274b 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import theodolite.uc4.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.ActivePowerRecordFactory; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * Builds Kafka Stream Topology for the History microservice. @@ -32,6 +32,7 @@ public class TopologyBuilder { private final String inputTopic; private final String outputTopic; + private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration aggregtionDuration; private final Duration aggregationAdvance; @@ -41,9 +42,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, + final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.srAvroSerdeFactory = srAvroSerdeFactory; this.aggregtionDuration = aggregtionDuration; this.aggregationAdvance = aggregationAdvance; } @@ -58,14 +61,14 @@ public class TopologyBuilder { this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), - IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return keyFactory.createKey(value.getIdentifier(), dateTime); }) .groupByKey( - Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 8220f4cd36b0639cd69ac102177a53b1ed90e5b6..7c9e2c4f790cf1fbb7dd34db573576d1e64077db 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.Objects; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Builder for the Kafka Streams configuration. @@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { final TopologyBuilder topologyBuilder = new TopologyBuilder( this.inputTopic, this.outputTopic, + new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.aggregtionDuration, this.aggregationAdvance);