From cb59dae2a1bac07eb28b7004e6c3bf56d8b6b3d2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Sat, 30 May 2020 09:23:14 +0200
Subject: [PATCH] Use avro records instead of kieker in uc2-application

Replace the kieker records with the avro records in uc2 application.
---
 .../uc2/application/AggregationService.java   |  5 ++-
 .../uc2/application/ConfigurationKeys.java    |  2 +
 .../JointFlatTransformer.java                 |  2 +-
 .../JointFlatTransformerFactory.java          |  2 +-
 .../streamprocessing/JointRecordParents.java  |  2 +-
 .../streamprocessing/RecordAggregator.java    |  9 ++---
 .../uc2/streamprocessing/TopologyBuilder.java | 39 ++++++++++---------
 .../Uc2KafkaStreamsBuilder.java               |  2 +
 8 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
index 2f37bf757..74ab2c217 100644
--- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
+++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java
@@ -5,7 +5,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.streams.KafkaStreams;
 import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
-import titan.ccp.common.configuration.Configurations;
+import titan.ccp.common.configuration.ServiceConfigurations;
 
 /**
  * A microservice that manages the history and, therefore, stores and aggregates incoming
@@ -14,7 +14,7 @@ import titan.ccp.common.configuration.Configurations;
  */
 public class AggregationService {
 
-  private final Configuration config = Configurations.create();
+  private final Configuration config = ServiceConfigurations.createWithDefaults();
 
   private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
 
@@ -47,6 +47,7 @@ public class AggregationService {
     // Configuration of the stream application
     final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
         .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
+        .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
         .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
         .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
         .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
diff --git a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
index b57f5c38e..5bfb3dc7c 100644
--- a/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
+++ b/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
@@ -23,6 +23,8 @@ public final class ConfigurationKeys {
 
   public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
 
+  public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
+
   private ConfigurationKeys() {}
 
 }
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java
index 0555df96c..724c7f6e2 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java
@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java
index b78eec51e..cf4362a21 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java
@@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * Factory class configuration required by {@link JointFlatTransformerFactory}.
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java
index 02b731858..fc8c077b0 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java
@@ -1,7 +1,7 @@
 package theodolite.uc2.streamprocessing;
 
 import java.util.Set;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * A joined pair of an {@link ActivePowerRecord} and its associated parents. Both the record and the
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java
index 10fb98c9c..9564e994d 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java
@@ -1,8 +1,8 @@
 package theodolite.uc2.streamprocessing;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import titan.ccp.models.records.ActivePowerRecord;
-import titan.ccp.models.records.AggregatedActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
  * Updates an {@link AggregatedActivePowerRecord} by a new {@link ActivePowerRecord}.
@@ -19,7 +19,7 @@ public class RecordAggregator {
     final double average = count == 0 ? 0.0 : sum / count;
     return new AggregatedActivePowerRecord(
         identifier.key(), record.getTimestamp(),
-        0.0, 0.0, count, sum, average);
+        count, sum, average);
   }
 
   /**
@@ -32,8 +32,7 @@ public class RecordAggregator {
     final double average = count == 0 ? 0.0 : sum / count;
     return new AggregatedActivePowerRecord(
         // TODO timestamp -1 indicates that this record is emitted by an substract event
-        identifier.key(), -1,
-        0.0, 0.0, count, sum, average);
+        identifier.key(), -1L, count, sum, average);
   }
 
 }
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
index b6c46fa3a..2249d066a 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
@@ -18,40 +18,44 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.configuration.events.EventSerde;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
 import titan.ccp.model.sensorregistry.SensorRegistry;
-import titan.ccp.models.records.ActivePowerRecord;
-import titan.ccp.models.records.ActivePowerRecordFactory;
-import titan.ccp.models.records.AggregatedActivePowerRecord;
-import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
 
 /**
  * Builds Kafka Stream Topology for the History microservice.
  */
 public class TopologyBuilder {
 
+  private static final int LATENCY_OUTPOUT_THRESHOLD = 1000;
   // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
 
   private final String inputTopic;
   private final String outputTopic;
   private final String configurationTopic;
+  private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
   private final Duration windowSize;
   private final Duration gracePeriod;
 
   private final StreamsBuilder builder = new StreamsBuilder();
   private final RecordAggregator recordAggregator = new RecordAggregator();
 
+  private StatsAccumulator latencyStats = new StatsAccumulator();
+  private long lastTime = System.currentTimeMillis();
 
   /**
    * Create a new {@link TopologyBuilder} using the given topics.
    */
   public TopologyBuilder(final String inputTopic, final String outputTopic,
-      final String configurationTopic, final Duration windowSize, final Duration gracePeriod) {
+      final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
+      final Duration windowSize, final Duration gracePeriod) {
     this.inputTopic = inputTopic;
     this.outputTopic = outputTopic;
     this.configurationTopic = configurationTopic;
+    this.srAvroSerdeFactory = srAvroSerdeFactory;
     this.windowSize = windowSize;
     this.gracePeriod = gracePeriod;
   }
@@ -84,11 +88,11 @@ public class TopologyBuilder {
     final KStream<String, ActivePowerRecord> values = this.builder
         .stream(this.inputTopic, Consumed.with(
             Serdes.String(),
-            IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
+            this.srAvroSerdeFactory.forValues()));
     final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
         .stream(this.outputTopic, Consumed.with(
             Serdes.String(),
-            IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())))
+            this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
         .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
 
     final KTable<String, ActivePowerRecord> inputTable = values
@@ -96,9 +100,9 @@ public class TopologyBuilder {
         .mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(),
             v.getValueInW()))
         .groupByKey(Grouped.with(Serdes.String(),
-            IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+            this.srAvroSerdeFactory.forValues()))
         .reduce((aggr, value) -> value, Materialized.with(Serdes.String(),
-            IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
+            this.srAvroSerdeFactory.forValues()));
     return inputTable;
   }
 
@@ -140,13 +144,13 @@ public class TopologyBuilder {
             jointFlatMapTransformerFactory.getStoreName())
         .groupByKey(Grouped.with(
             SensorParentKeySerde.serde(),
-            IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+            this.srAvroSerdeFactory.forValues()))
         .windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod))
         .reduce(
             // TODO Configurable window aggregation function
             (aggValue, newValue) -> newValue,
             Materialized.with(SensorParentKeySerde.serde(),
-                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
+                this.srAvroSerdeFactory.forValues()));
 
   }
 
@@ -159,14 +163,14 @@ public class TopologyBuilder {
                 new WindowedSerdes.TimeWindowedSerde<>(
                     Serdes.String(),
                     this.windowSize.toMillis()),
-                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+                this.srAvroSerdeFactory.forValues()))
         .aggregate(
             () -> null, this.recordAggregator::add, this.recordAggregator::substract,
             Materialized.with(
                 new WindowedSerdes.TimeWindowedSerde<>(
                     Serdes.String(),
                     this.windowSize.toMillis()),
-                IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())))
+                this.srAvroSerdeFactory.forValues()))
         .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded()))
         // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
         .toStream()
@@ -175,16 +179,13 @@ public class TopologyBuilder {
         .map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp
   }
 
-  private StatsAccumulator latencyStats = new StatsAccumulator();
-  private long lastTime = System.currentTimeMillis();
-
   private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) {
     aggregations
         .peek((k, v) -> {
           final long time = System.currentTimeMillis();
           final long latency = time - v.getTimestamp();
           this.latencyStats.add(latency);
-          if (time - this.lastTime >= 1000) {
+          if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) {
             System.out.println("latency,"
                 + time + ','
                 + this.latencyStats.mean() + ','
@@ -205,6 +206,6 @@ public class TopologyBuilder {
         })
         .to(this.outputTopic, Produced.with(
             Serdes.String(),
-            IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())));
+            this.srAvroSerdeFactory.forValues()));
   }
 }
diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
index ce7d5e90b..2f3e5c7e9 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java
@@ -4,6 +4,7 @@ import java.time.Duration;
 import java.util.Objects;
 import org.apache.kafka.streams.Topology;
 import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
+import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 
 /**
  * Builder for the Kafka Streams configuration.
@@ -54,6 +55,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
         this.inputTopic,
         this.outputTopic,
         this.configurationTopic,
+        new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl),
         this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize,
         this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod);
 
-- 
GitLab