Skip to content
Snippets Groups Projects
Commit 0b8d9cc8 authored by Sören Henning's avatar Sören Henning
Browse files

Compute summary statistics instead of just sum

parent a1af3d50
No related branches found
No related tags found
No related merge requests found
Pipeline #371 failed
...@@ -15,6 +15,7 @@ targetCompatibility = "1.11" ...@@ -15,6 +15,7 @@ targetCompatibility = "1.11"
dependencies { dependencies {
compile project(':') compile project(':')
compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
compile 'org.slf4j:slf4j-simple:1.6.1' compile 'org.slf4j:slf4j-simple:1.6.1'
// Use JUnit test framework // Use JUnit test framework
......
package uc3.streamprocessing; package uc3.streamprocessing;
import com.google.gson.Gson; import com.google.common.math.Stats;
import java.time.Duration; import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.models.records.ActivePowerRecordFactory;
import uc3.streamprocessing.util.StatsFactory;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -23,18 +27,17 @@ public class TopologyBuilder { ...@@ -23,18 +27,17 @@ public class TopologyBuilder {
private final String inputTopic; private final String inputTopic;
private final String outputTopic; private final String outputTopic;
private final Duration duration; private final Duration duration;
private final Gson gson;
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, final Duration duration) { public TopologyBuilder(final String inputTopic, final String outputTopic,
final Duration duration) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.duration = duration; this.duration = duration;
this.gson = new Gson();
} }
/** /**
...@@ -43,11 +46,23 @@ public class TopologyBuilder { ...@@ -43,11 +46,23 @@ public class TopologyBuilder {
public Topology build() { public Topology build() {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.groupByKey().windowedBy(TimeWindows.of(this.duration)) .groupByKey().windowedBy(TimeWindows.of(this.duration))
.aggregate(() -> 0.0, (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), // .aggregate(
Materialized.with(Serdes.String(), Serdes.Double())) // () -> 0.0,
.toStream().peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)).to(this.outputTopic); // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
Materialized.with(
Serdes.String(),
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> System.out.printf("key %s, value %f \n", k, v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(); return this.builder.build();
} }
......
package uc3.streamprocessing.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
/**
* Factory methods for working with {@link Stats}.
*/
public final class StatsFactory {
private StatsFactory() {}
/**
* Add a value to a {@link Stats} object.
*/
public static Stats accumulate(final Stats stats, final double value) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(stats);
statsAccumulator.add(value);
return statsAccumulator.snapshot();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment