From b0218c468bfd10de8ff38a0983d7a4e79d213b8d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Sat, 30 May 2020 10:24:09 +0200
Subject: [PATCH] Use avro records instead of kieker in uc3-application

Replace the kieker records with the avro records in uc3 application.
---
 .../commons/kafkastreams/ConfigurationKeys.java       |  3 +++
 .../main/resources/META-INF/application.properties    |  4 ++++
 .../theodolite/uc3/application/HistoryService.java    |  4 ++--
 .../uc3/streamprocessing/TopologyBuilder.java         | 11 +++++++----
 .../uc3/streamprocessing/Uc3KafkaStreamsBuilder.java  |  3 ++-
 .../main/resources/META-INF/application.properties    |  4 ++++
 6 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
index af05c51bd..260dbba9c 100644
--- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
+++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java
@@ -31,6 +31,9 @@ public final class ConfigurationKeys {
 
   public static final String WINDOW_GRACE_MS = "window.grace.ms";
 
+  // UC3
+  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+
   // UC4
   public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
 
diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties
index 3aa519c50..21d0c0f20 100644
--- a/uc2-application/src/main/resources/META-INF/application.properties
+++ b/uc2-application/src/main/resources/META-INF/application.properties
@@ -5,8 +5,12 @@ configuration.kafka.topic=configuration
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
+
+schema.registry.url=http://localhost:8091
+
 window.size.ms=1000
 window.grace.ms=0
+
 num.threads=1
 commit.interval.ms=100
 cache.max.bytes.buffering=-1
diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
index db7ec3ba6..bfb28896b 100644
--- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
+++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java
@@ -7,7 +7,7 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.KafkaStreams;
 import theodolite.commons.kafkastreams.ConfigurationKeys;
 import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder;
-import titan.ccp.common.configuration.Configurations;
+import titan.ccp.common.configuration.ServiceConfigurations;
 
 /**
  * A microservice that manages the history and, therefore, stores and aggregates incoming
@@ -16,7 +16,7 @@ import titan.ccp.common.configuration.Configurations;
  */
 public class HistoryService {
 
-  private final Configuration config = Configurations.create();
+  private final Configuration config = ServiceConfigurations.createWithDefaults();
 
   private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
   private final int windowDurationMinutes = Integer
diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
index 0ad1845f6..eec3793d8 100644
--- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
+++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java
@@ -14,8 +14,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import theodolite.uc3.streamprocessing.util.StatsFactory;
 import titan.ccp.common.kafka.GenericSerde;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-import titan.ccp.models.records.ActivePowerRecordFactory;
+import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
+import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * Builds Kafka Stream Topology for the History microservice.
@@ -26,6 +26,7 @@ public class TopologyBuilder {
 
   private final String inputTopic;
   private final String outputTopic;
+  private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
   private final Duration duration;
 
   private final StreamsBuilder builder = new StreamsBuilder();
@@ -34,9 +35,11 @@ public class TopologyBuilder {
    * Create a new {@link TopologyBuilder} using the given topics.
    */
   public TopologyBuilder(final String inputTopic, final String outputTopic,
+      final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
       final Duration duration) {
     this.inputTopic = inputTopic;
     this.outputTopic = outputTopic;
+    this.srAvroSerdeFactory = srAvroSerdeFactory;
     this.duration = duration;
   }
 
@@ -47,7 +50,7 @@ public class TopologyBuilder {
     this.builder
         .stream(this.inputTopic,
             Consumed.with(Serdes.String(),
-                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+                this.srAvroSerdeFactory.<ActivePowerRecord>forKeys()))
         .groupByKey()
         .windowedBy(TimeWindows.of(this.duration))
         // .aggregate(
@@ -62,7 +65,7 @@ public class TopologyBuilder {
                 GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
         .toStream()
         .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
-        .peek((k, v) -> System.out.println(k + ": " + v))
+        .peek((k, v) -> LOGGER.info(k + ": " + v))
         .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
     return this.builder.build();
diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
index 63841361b..e74adf7c8 100644
--- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
+++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java
@@ -4,6 +4,7 @@ import java.time.Duration;
 import java.util.Objects;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
+import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 
 /**
  * Builder for the Kafka Streams configuration.
@@ -36,7 +37,7 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
     Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
 
     final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
-        this.windowDuration);
+        new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
     return topologyBuilder.build();
   }
 
diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties
index ef279332f..1ad3be2cf 100644
--- a/uc3-application/src/main/resources/META-INF/application.properties
+++ b/uc3-application/src/main/resources/META-INF/application.properties
@@ -1,6 +1,10 @@
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
+kafka.window.duration.minutes=1
+
+schema.registry.url=http://localhost:8091
+
 num.threads=1
 commit.interval.ms=100
 cache.max.bytes.buffering=-1
-- 
GitLab