Skip to content
Snippets Groups Projects
Commit 9e0f3ef2 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Fix uc2 not sending output

Because of suppress and a compute intensive log in the stream
application the use case may not produce any output.
Further a log message is added for every computed aggregation.
parent 65aa5fd3
No related branches found
No related tags found
1 merge request!37Fix UC2 not sending output
...@@ -13,8 +13,6 @@ import org.apache.kafka.streams.kstream.KStream; ...@@ -13,8 +13,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.WindowedSerdes;
...@@ -46,8 +44,8 @@ public class TopologyBuilder { ...@@ -46,8 +44,8 @@ public class TopologyBuilder {
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator(); private final RecordAggregator recordAggregator = new RecordAggregator();
private StatsAccumulator latencyStats = new StatsAccumulator(); private final StatsAccumulator latencyStats = new StatsAccumulator();
private long lastTime = System.currentTimeMillis(); private final long lastTime = System.currentTimeMillis();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
...@@ -174,8 +172,6 @@ public class TopologyBuilder { ...@@ -174,8 +172,6 @@ public class TopologyBuilder {
Serdes.String(), Serdes.String(),
this.windowSize.toMillis()), this.windowSize.toMillis()),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded()))
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream() .toStream()
// TODO timestamp -1 indicates that this record is emitted by an substract event // TODO timestamp -1 indicates that this record is emitted by an substract event
.filter((k, record) -> record.getTimestamp() != -1) .filter((k, record) -> record.getTimestamp() != -1)
...@@ -184,31 +180,7 @@ public class TopologyBuilder { ...@@ -184,31 +180,7 @@ public class TopologyBuilder {
private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) {
aggregations aggregations
.peek((k, v) -> { .peek((k, v) -> LOGGER.info("Aggregated Record Key: " + k + " Value: " + v))
final long time = System.currentTimeMillis();
final long latency = time - v.getTimestamp();
this.latencyStats.add(latency);
if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("latency,"
+ time + ','
+ this.latencyStats.mean() + ','
+ (this.latencyStats.count() > 0
? this.latencyStats.populationStandardDeviation()
: Double.NaN)
+ ','
+ (this.latencyStats.count() > 1
? this.latencyStats.sampleStandardDeviation()
: Double.NaN)
+ ','
+ this.latencyStats.min() + ','
+ this.latencyStats.max() + ','
+ this.latencyStats.count());
}
this.latencyStats = new StatsAccumulator();
this.lastTime = time;
}
})
.to(this.outputTopic, Produced.with( .to(this.outputTopic, Produced.with(
Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.forValues())); this.srAvroSerdeFactory.forValues()));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment