diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index b2dfae12a0bd207b490086d8ca0767d5a6b9cb1d..1f03132cbcd4faa3964b309b08efee89a3bb8c40 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -13,8 +13,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Suppressed; -import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; @@ -46,8 +44,8 @@ public class TopologyBuilder { private final StreamsBuilder builder = new StreamsBuilder(); private final RecordAggregator recordAggregator = new RecordAggregator(); - private StatsAccumulator latencyStats = new StatsAccumulator(); - private long lastTime = System.currentTimeMillis(); + private final StatsAccumulator latencyStats = new StatsAccumulator(); + private final long lastTime = System.currentTimeMillis(); /** * Create a new {@link TopologyBuilder} using the given topics. @@ -174,8 +172,6 @@ public class TopologyBuilder { Serdes.String(), this.windowSize.toMillis()), this.srAvroSerdeFactory.forValues())) - .suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded())) - // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) .toStream() // TODO timestamp -1 indicates that this record is emitted by an substract event .filter((k, record) -> record.getTimestamp() != -1) @@ -184,31 +180,7 @@ public class TopologyBuilder { private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { aggregations - .peek((k, v) -> { - final long time = System.currentTimeMillis(); - final long latency = time - v.getTimestamp(); - this.latencyStats.add(latency); - if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("latency," - + time + ',' - + this.latencyStats.mean() + ',' - + (this.latencyStats.count() > 0 - ? this.latencyStats.populationStandardDeviation() - : Double.NaN) - + ',' - + (this.latencyStats.count() > 1 - ? this.latencyStats.sampleStandardDeviation() - : Double.NaN) - + ',' - + this.latencyStats.min() + ',' - + this.latencyStats.max() + ',' - + this.latencyStats.count()); - } - this.latencyStats = new StatsAccumulator(); - this.lastTime = time; - } - }) + .peek((k, v) -> LOGGER.info("Aggregated Record Key: " + k + " Value: " + v)) .to(this.outputTopic, Produced.with( Serdes.String(), this.srAvroSerdeFactory.forValues()));