diff --git a/uc3-application/build.gradle b/uc3-application/build.gradle
index b0279201322e94d9bd9b14222a2dc218f18b4309..604c9f60d3a5adf02aa673db52ec05f074aa0342 100644
--- a/uc3-application/build.gradle
+++ b/uc3-application/build.gradle
@@ -15,6 +15,7 @@ targetCompatibility = "1.11"
 dependencies {
     compile project(':')
     
+    compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
     compile 'org.slf4j:slf4j-simple:1.6.1'
 
     // Use JUnit test framework
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
index 608c00940d9d5d8f2a210efdb98c54e385132405..fdaf6dd3577a2365db2994481c4ae4bee331fb87 100644
--- a/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
+++ b/uc3-application/src/main/java/uc3/streamprocessing/TopologyBuilder.java
@@ -1,54 +1,69 @@
 package uc3.streamprocessing;
 
-import com.google.gson.Gson;
+import com.google.common.math.Stats;
 import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import titan.ccp.common.kafka.GenericSerde;
 import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
 import titan.ccp.models.records.ActivePowerRecordFactory;
+import uc3.streamprocessing.util.StatsFactory;
 
 /**
  * Builds Kafka Stream Topology for the History microservice.
  */
 public class TopologyBuilder {
 
-	private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
-
-	private final String inputTopic;
-	private final String outputTopic;
-	private final Duration duration;
-	private final Gson gson;
-
-	private final StreamsBuilder builder = new StreamsBuilder();
-
-	/**
-	 * Create a new {@link TopologyBuilder} using the given topics.
-	 */
-	public TopologyBuilder(final String inputTopic, final String outputTopic, final Duration duration) {
-		this.inputTopic = inputTopic;
-		this.outputTopic = outputTopic;
-		this.duration = duration;
-		this.gson = new Gson();
-	}
-
-	/**
-	 * Build the {@link Topology} for the History microservice.
-	 */
-	public Topology build() {
-		this.builder
-				.stream(this.inputTopic,
-						Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
-				.groupByKey().windowedBy(TimeWindows.of(this.duration))
-				.aggregate(() -> 0.0, (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
-						Materialized.with(Serdes.String(), Serdes.Double()))
-				.toStream().peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)).to(this.outputTopic);
-
-		return this.builder.build();
-	}
+  private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final Duration duration;
+
+  private final StreamsBuilder builder = new StreamsBuilder();
+
+  /**
+   * Create a new {@link TopologyBuilder} using the given topics.
+   */
+  public TopologyBuilder(final String inputTopic, final String outputTopic,
+      final Duration duration) {
+    this.inputTopic = inputTopic;
+    this.outputTopic = outputTopic;
+    this.duration = duration;
+  }
+
+  /**
+   * Build the {@link Topology} for the History microservice.
+   */
+  public Topology build() {
+    this.builder
+        .stream(this.inputTopic,
+            Consumed.with(Serdes.String(),
+                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+        .groupByKey().windowedBy(TimeWindows.of(this.duration))
+        // .aggregate(
+        // () -> 0.0,
+        // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
+        // Materialized.with(Serdes.String(), Serdes.Double()))
+        .aggregate(
+            () -> Stats.of(),
+            (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
+            Materialized.with(
+                Serdes.String(),
+                GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
+        .toStream()
+        .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
+        .peek((k, v) -> System.out.printf("key %s, value %f \n", k, v))
+        .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+    return this.builder.build();
+  }
 }
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java b/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..030a1d8aefa617eb0cadd804b09fa2f10ba5a696
--- /dev/null
+++ b/uc3-application/src/main/java/uc3/streamprocessing/util/StatsFactory.java
@@ -0,0 +1,23 @@
+package uc3.streamprocessing.util;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+
+/**
+ * Factory methods for working with {@link Stats}.
+ */
+public final class StatsFactory {
+
+  private StatsFactory() {}
+
+  /**
+   * Add a value to a {@link Stats} object.
+   */
+  public static Stats accumulate(final Stats stats, final double value) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(stats);
+    statsAccumulator.add(value);
+    return statsAccumulator.snapshot();
+  }
+
+}