From 8bd84de4f2b4a8e1bc5f4da48d64fc588fc42e77 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Tue, 21 Apr 2020 21:07:51 +0200
Subject: [PATCH] Remove need for schema registry

---
 .../java/uc4/application/HistoryService.java  |  1 -
 .../streamprocessing/KafkaStreamsBuilder.java |  8 +---
 .../java/uc4/streamprocessing/Serdes.java     | 45 -------------------
 .../uc4/streamprocessing/TopologyBuilder.java | 30 ++++++-------
 4 files changed, 16 insertions(+), 68 deletions(-)
 delete mode 100644 uc4-application/src/main/java/uc4/streamprocessing/Serdes.java

diff --git a/uc4-application/src/main/java/uc4/application/HistoryService.java b/uc4-application/src/main/java/uc4/application/HistoryService.java
index 46c6aa3cc..2e08a92d4 100644
--- a/uc4-application/src/main/java/uc4/application/HistoryService.java
+++ b/uc4-application/src/main/java/uc4/application/HistoryService.java
@@ -36,7 +36,6 @@ public class HistoryService {
   private void createKafkaStreamsApplication() {
     final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
         .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
-        .schemaRegistry(this.schemaRegistry)
         .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
         .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
         .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
index a5e71ebb3..a66150462 100644
--- a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
+++ b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
@@ -22,7 +22,6 @@ public class KafkaStreamsBuilder {
   private String inputTopic; // NOPMD
   private String outputTopic; // NOPMD
   private Duration windowDuration; // NOPMD
-  private String schemaRegistryUrl; // NOPMD
   private int numThreads = -1; // NOPMD
   private int commitIntervalMs = -1; // NOPMD
   private int cacheMaxBytesBuff = -1; // NOPMD
@@ -37,11 +36,6 @@ public class KafkaStreamsBuilder {
     return this;
   }
 
-  public KafkaStreamsBuilder schemaRegistry(final String url) {
-    this.schemaRegistryUrl = url;
-    return this;
-  }
-
   public KafkaStreamsBuilder outputTopic(final String outputTopic) {
     this.outputTopic = outputTopic;
     return this;
@@ -92,7 +86,7 @@ public class KafkaStreamsBuilder {
     Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
     // TODO log parameters
     final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
-        this.windowDuration, this.schemaRegistryUrl);
+        this.windowDuration);
     final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
         .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
         .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java
deleted file mode 100644
index dd7079734..000000000
--- a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package uc4.streamprocessing;
-
-import com.google.common.math.Stats;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.kafka.common.serialization.Serde;
-import titan.ccp.common.kafka.GenericSerde;
-import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
-import titan.ccp.model.records.ActivePowerRecord;
-import titan.ccp.model.records.AggregatedActivePowerRecord;
-import titan.ccp.model.records.WindowedActivePowerRecord;
-
-final class Serdes {
-
-  private final SchemaRegistryAvroSerdeFactory avroSerdeFactory;
-
-  public Serdes(final String schemaRegistryUrl) {
-    this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
-  }
-
-  public Serde<String> string() {
-    return org.apache.kafka.common.serialization.Serdes.String();
-  }
-
-  public Serde<WindowedActivePowerRecord> windowedActivePowerValues() {
-    return this.avroSerdeFactory.forKeys();
-  }
-
-  public Serde<ActivePowerRecord> activePowerRecordValues() {
-    return this.avroSerdeFactory.forValues();
-  }
-
-  public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() {
-    return this.avroSerdeFactory.forValues();
-  }
-
-  public <T extends SpecificRecord> Serde<T> avroValues() {
-    return this.avroSerdeFactory.forValues();
-  }
-
-  public Serde<Stats> stats() {
-    return GenericSerde.from(Stats::toByteArray, Stats::fromByteArray);
-  }
-
-
-}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
index 181514bb6..59cbfc3e2 100644
--- a/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
+++ b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
@@ -6,6 +6,7 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
@@ -16,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import titan.ccp.common.kafka.GenericSerde;
 import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-import titan.ccp.model.records.ActivePowerRecord;
 import titan.ccp.model.records.HourOfDayActivePowerRecord;
 import titan.ccp.models.records.ActivePowerRecordFactory;
 import uc4.streamprocessing.util.StatsFactory;
@@ -34,7 +35,6 @@ public class TopologyBuilder {
 
   private final String inputTopic;
   private final String outputTopic;
-  private final Serdes serdes;
 
   private final StreamsBuilder builder = new StreamsBuilder();
 
@@ -42,10 +42,9 @@ public class TopologyBuilder {
    * Create a new {@link TopologyBuilder} using the given topics.
    */
   public TopologyBuilder(final String inputTopic, final String outputTopic,
-      final Duration duration, final String schemaRegistryUrl) {
+      final Duration duration) {
     this.inputTopic = inputTopic;
     this.outputTopic = outputTopic;
-    this.serdes = new Serdes(schemaRegistryUrl);
   }
 
   /**
@@ -59,27 +58,28 @@ public class TopologyBuilder {
         new HourOfDayRecordFactory();
     final TimeWindows timeWindows =
         TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1));
-    final String statsTopic = "output";
 
     this.builder
         .stream(this.inputTopic,
-            Consumed.with(this.serdes.string(),
+            Consumed.with(Serdes.String(),
                 IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
-        .mapValues(kieker -> new ActivePowerRecord(
-            kieker.getIdentifier(),
-            kieker.getTimestamp(),
-            kieker.getValueInW()))
+        // .mapValues(kieker -> new ActivePowerRecord(
+        // kieker.getIdentifier(),
+        // kieker.getTimestamp(),
+        // kieker.getValueInW()))
         .selectKey((key, value) -> {
           final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
           final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
           return keyFactory.createKey(value.getIdentifier(), dateTime);
         })
-        .groupByKey(Grouped.with(keySerde, this.serdes.activePowerRecordValues()))
+        .groupByKey(
+            Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
         .windowedBy(timeWindows)
         .aggregate(
             () -> Stats.of(),
             (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
-            Materialized.with(keySerde, this.serdes.stats()))
+            Materialized.with(keySerde,
+                GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
         .toStream()
         .map((key, stats) -> KeyValue.pair(
             keyFactory.getSensorId(key.key()),
@@ -88,10 +88,10 @@ public class TopologyBuilder {
         // statsRecordFactory.create(key, value)))
         // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
         .to(
-            statsTopic,
+            this.outputTopic,
             Produced.with(
-                this.serdes.string(),
-                this.serdes.string()));
+                Serdes.String(),
+                Serdes.String()));
     // this.serdes.avroValues()));
 
     return this.builder.build();
-- 
GitLab