From 0b8d9cc8bfd41abefe06d6ea2e2f6b4d1610259c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Tue, 21 Apr 2020 13:02:51 +0200
Subject: [PATCH] Compute summary statistics instead of just sum

---
 uc3-application/build.gradle                  |  1 +
 .../uc3/streamprocessing/TopologyBuilder.java | 83 +++++++++++--------
 .../streamprocessing/util/StatsFactory.java   | 23 +++++
 3 files changed, 73 insertions(+), 34 deletions(-)
 create mode 100644 uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java

diff --git a/uc3-application/build.gradle b/uc3-application/build.gradle
index b02792013..604c9f60d 100644
--- a/uc3-application/build.gradle
+++ b/uc3-application/build.gradle
@@ -15,6 +15,7 @@ targetCompatibility = "1.11"
 dependencies {
     compile project(':')
     
+    compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
     compile 'org.slf4j:slf4j-simple:1.6.1'
 
     // Use JUnit test framework
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
index 608c00940..fdaf6dd35 100644
--- a/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
+++ b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
@@ -1,54 +1,69 @@
 package uc3.streamprocessing;
 
-import com.google.gson.Gson;
+import com.google.common.math.Stats;
 import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import titan.ccp.common.kafka.GenericSerde;
 import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
 import titan.ccp.models.records.ActivePowerRecordFactory;
+import uc3.streamprocessing.util.StatsFactory;
 
 /**
  * Builds Kafka Stream Topology for the History microservice.
  */
 public class TopologyBuilder {
 
-	private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
-
-	private final String inputTopic;
-	private final String outputTopic;
-	private final Duration duration;
-	private final Gson gson;
-
-	private final StreamsBuilder builder = new StreamsBuilder();
-
-	/**
-	 * Create a new {@link TopologyBuilder} using the given topics.
-	 */
-	public TopologyBuilder(final String inputTopic, final String outputTopic, final Duration duration) {
-		this.inputTopic = inputTopic;
-		this.outputTopic = outputTopic;
-		this.duration = duration;
-		this.gson = new Gson();
-	}
-
-	/**
-	 * Build the {@link Topology} for the History microservice.
-	 */
-	public Topology build() {
-		this.builder
-				.stream(this.inputTopic,
-						Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
-				.groupByKey().windowedBy(TimeWindows.of(this.duration))
-				.aggregate(() -> 0.0, (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
-						Materialized.with(Serdes.String(), Serdes.Double()))
-				.toStream().peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)).to(this.outputTopic);
-
-		return this.builder.build();
-	}
+  private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final Duration duration;
+
+  private final StreamsBuilder builder = new StreamsBuilder();
+
+  /**
+   * Create a new {@link TopologyBuilder} using the given topics.
+   */
+  public TopologyBuilder(final String inputTopic, final String outputTopic,
+      final Duration duration) {
+    this.inputTopic = inputTopic;
+    this.outputTopic = outputTopic;
+    this.duration = duration;
+  }
+
+  /**
+   * Build the {@link Topology} for the History microservice.
+   */
+  public Topology build() {
+    this.builder
+        .stream(this.inputTopic,
+            Consumed.with(Serdes.String(),
+                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+        .groupByKey().windowedBy(TimeWindows.of(this.duration))
+        // .aggregate(
+        // () -> 0.0,
+        // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
+        // Materialized.with(Serdes.String(), Serdes.Double()))
+        .aggregate(
+            () -> Stats.of(),
+            (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
+            Materialized.with(
+                Serdes.String(),
+                GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
+        .toStream()
+        .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
+        .peek((k, v) -> System.out.printf("key %s, value %f \n", k, v))
+        .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+    return this.builder.build();
+  }
 }
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java b/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java
new file mode 100644
index 000000000..030a1d8ae
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java
@@ -0,0 +1,23 @@
+package uc3.streamprocessing.util;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+
+/**
+ * Factory methods for working with {@link Stats}.
+ */
+public final class StatsFactory {
+
+  private StatsFactory() {}
+
+  /**
+   * Add a value to a {@link Stats} object.
+   */
+  public static Stats accumulate(final Stats stats, final double value) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(stats);
+    statsAccumulator.add(value);
+    return statsAccumulator.snapshot();
+  }
+
+}
-- 
GitLab