From 9e0f3ef2875b2337036d1a2796eb476bd2313471 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Sun, 23 Aug 2020 18:43:48 +0200
Subject: [PATCH] Fix uc2 not sending output

Because of suppress and a compute intensive log in the stream
application the use case may not produce any output.
Further a log message is added for every computed aggregation.
---
 .../uc2/streamprocessing/TopologyBuilder.java | 34 ++-----------------
 1 file changed, 3 insertions(+), 31 deletions(-)

diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
index b2dfae12a..1f03132cb 100644
--- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
+++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java
@@ -13,8 +13,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Suppressed;
-import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
@@ -46,8 +44,8 @@ public class TopologyBuilder {
   private final StreamsBuilder builder = new StreamsBuilder();
   private final RecordAggregator recordAggregator = new RecordAggregator();
 
-  private StatsAccumulator latencyStats = new StatsAccumulator();
-  private long lastTime = System.currentTimeMillis();
+  private final StatsAccumulator latencyStats = new StatsAccumulator();
+  private final long lastTime = System.currentTimeMillis();
 
   /**
    * Create a new {@link TopologyBuilder} using the given topics.
@@ -174,8 +172,6 @@ public class TopologyBuilder {
                     Serdes.String(),
                     this.windowSize.toMillis()),
                 this.srAvroSerdeFactory.forValues()))
-        .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded()))
-        // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
         .toStream()
         // TODO timestamp -1 indicates that this record is emitted by an substract event
         .filter((k, record) -> record.getTimestamp() != -1)
@@ -184,31 +180,7 @@ public class TopologyBuilder {
 
   private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) {
     aggregations
-        .peek((k, v) -> {
-          final long time = System.currentTimeMillis();
-          final long latency = time - v.getTimestamp();
-          this.latencyStats.add(latency);
-          if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) {
-            if (LOGGER.isInfoEnabled()) {
-              LOGGER.info("latency,"
-                  + time + ','
-                  + this.latencyStats.mean() + ','
-                  + (this.latencyStats.count() > 0
-                      ? this.latencyStats.populationStandardDeviation()
-                      : Double.NaN)
-                  + ','
-                  + (this.latencyStats.count() > 1
-                      ? this.latencyStats.sampleStandardDeviation()
-                      : Double.NaN)
-                  + ','
-                  + this.latencyStats.min() + ','
-                  + this.latencyStats.max() + ','
-                  + this.latencyStats.count());
-            }
-            this.latencyStats = new StatsAccumulator();
-            this.lastTime = time;
-          }
-        })
+        .peek((k, v) -> LOGGER.info("Aggregated Record Key: " + k + " Value: " + v))
         .to(this.outputTopic, Produced.with(
             Serdes.String(),
             this.srAvroSerdeFactory.forValues()));
-- 
GitLab