From 9bf5feaa0f8f0c6cc1a59c6eb1a7ec3cee155426 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Thu, 26 Mar 2020 14:58:41 +0100
Subject: [PATCH] Add implementation for use case 4

---
 build.gradle                                  |   2 +-
 uc4-application/Dockerfile                    |   8 +-
 uc4-application/build.gradle                  |   5 +-
 .../ccp/kiekerbridge/KafkaRecordSender.java   |  84 ---------
 .../expbigdata19/ConfigPublisher.java         |  50 ------
 .../expbigdata19/ExperimentorBigData.java     |  21 ---
 .../expbigdata19/LoadCounter.java             |  84 ---------
 .../expbigdata19/LoadGenerator.java           | 124 -------------
 .../expbigdata19/LoadGeneratorExtrem.java     | 165 ------------------
 .../uc4/application/ConfigurationKeys.java    |  24 +++
 .../java/uc4/application/HistoryService.java  |  55 ++++++
 .../uc4/streamprocessing/DayOfWeekKey.java    |  31 ++++
 .../streamprocessing/DayOfWeekKeyFactory.java |  22 +++
 .../streamprocessing/DayOfWeekKeySerde.java   |  33 ++++
 .../DayOfWeekRecordFactory.java               |  28 +++
 .../uc4/streamprocessing/HourOfDayKey.java    |  29 +++
 .../streamprocessing/HourOfDayKeyFactory.java |  21 +++
 .../streamprocessing/HourOfDayKeySerde.java   |  32 ++++
 .../HourOfDayRecordFactory.java               |  28 +++
 .../uc4/streamprocessing/HourOfWeekKey.java   |  40 +++++
 .../HourOfWeekKeyFactory.java                 |  23 +++
 .../streamprocessing/HourOfWeekKeySerde.java  |  35 ++++
 .../HourOfWeekRecordFactory.java              |  29 +++
 .../streamprocessing/KafkaStreamsBuilder.java | 104 +++++++++++
 .../RecordDatabaseAdapter.java                |  85 +++++++++
 .../java/uc4/streamprocessing/Serdes.java     |  45 +++++
 .../uc4/streamprocessing/StatsKeyFactory.java |  17 ++
 .../streamprocessing/StatsRecordFactory.java  |  22 +++
 .../uc4/streamprocessing/TopologyBuilder.java |  99 +++++++++++
 .../streamprocessing/util/StatsFactory.java   |  23 +++
 .../resources/META-INF/application.properties |   6 +
 31 files changed, 840 insertions(+), 534 deletions(-)
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java
 delete mode 100644 uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java
 create mode 100644 uc4-application/src/main/java/uc4/application/ConfigurationKeys.java
 create mode 100644 uc4-application/src/main/java/uc4/application/HistoryService.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKey.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeyFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeySerde.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekRecordFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKey.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeyFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeySerde.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfDayRecordFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKey.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeyFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeySerde.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekRecordFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/RecordDatabaseAdapter.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/Serdes.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/StatsKeyFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/StatsRecordFactory.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
 create mode 100644 uc4-application/src/main/java/uc4/streamprocessing/util/StatsFactory.java
 create mode 100644 uc4-application/src/main/resources/META-INF/application.properties

diff --git a/build.gradle b/build.gradle
index 26fd99f79..c73a8582d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -34,7 +34,7 @@ allprojects {
 
 dependencies {
     // These dependencies is exported to consumers, that is to say found on their compile classpath.
-    api('org.industrial-devops:titan-ccp-common:0.0.1-SNAPSHOT') { changing = true }
+    api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true }
     api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT'
     api 'net.sourceforge.teetime:teetime:3.0'
 
diff --git a/uc4-application/Dockerfile b/uc4-application/Dockerfile
index 9b17de3af..8cb65188a 100644
--- a/uc4-application/Dockerfile
+++ b/uc4-application/Dockerfile
@@ -1,6 +1,8 @@
 FROM openjdk:11-slim
 
-ADD build/distributions/exp-bigdata19-bridge.tar /
 
-CMD export JAVA_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL \
-    && /exp-bigdata19-bridge/bin/exp-bigdata19-bridge
\ No newline at end of file
+ADD build/distributions/uc4-application.tar /
+
+
+CMD  JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
+     /uc4-application/bin/uc4-application
diff --git a/uc4-application/build.gradle b/uc4-application/build.gradle
index 12e597b37..e61af74b1 100644
--- a/uc4-application/build.gradle
+++ b/uc4-application/build.gradle
@@ -16,16 +16,17 @@ dependencies {
     compile project(':')
     
     compile 'org.slf4j:slf4j-simple:1.6.1'
+    compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
 
     // Use JUnit test framework
     testCompile 'junit:junit:4.12'
 }
 
-mainClassName = "titan.ccp.kiekerbridge.expbigdata19.ExperimentorBigData"
+mainClassName = "uc4.application.HistoryService"
 
 eclipse {
     classpath {
        downloadSources=true
        downloadJavadoc=true
     }
-}
\ No newline at end of file
+}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java
deleted file mode 100644
index b46128c8e..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package titan.ccp.kiekerbridge;
-
-import java.util.Properties;
-import java.util.function.Function;
-import kieker.common.record.IMonitoringRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-
-
-/**
- * Sends monitoring records to Kafka.
- *
- * @param <T> {@link IMonitoringRecord} to send
- */
-public class KafkaRecordSender<T extends IMonitoringRecord> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
-
-  private final String topic;
-
-  private final Function<T, String> keyAccessor;
-
-  private final Function<T, Long> timestampAccessor;
-
-  private final Producer<String, T> producer;
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, x -> "", x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor) {
-    this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
-    this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
-  }
-
-  /**
-   * Create a new {@link KafkaRecordSender}.
-   */
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
-      final Properties defaultProperties) {
-    this.topic = topic;
-    this.keyAccessor = keyAccessor;
-    this.timestampAccessor = timestampAccessor;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put("bootstrap.servers", bootstrapServers);
-    // properties.put("acks", this.acknowledges);
-    // properties.put("batch.size", this.batchSize);
-    // properties.put("linger.ms", this.lingerMs);
-    // properties.put("buffer.memory", this.bufferMemory);
-
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
-        IMonitoringRecordSerde.serializer());
-  }
-
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java
deleted file mode 100644
index d5f55a4ab..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package titan.ccp.kiekerbridge.expbigdata19;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import titan.ccp.configuration.events.Event;
-import titan.ccp.configuration.events.EventSerde;
-
-public class ConfigPublisher {
-
-  private final String topic;
-
-  private final Producer<Event, String> producer;
-
-  public ConfigPublisher(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, new Properties());
-  }
-
-  public ConfigPublisher(final String bootstrapServers, final String topic,
-      final Properties defaultProperties) {
-    this.topic = topic;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB
-    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB
-
-    this.producer =
-        new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer());
-  }
-
-  public void publish(final Event event, final String value) {
-    final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value);
-    try {
-      this.producer.send(record).get();
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  public void close() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java
deleted file mode 100644
index a50bbd942..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package titan.ccp.kiekerbridge.expbigdata19;
-
-import java.io.IOException;
-import java.util.Objects;
-
-public class ExperimentorBigData {
-
-  public static void main(final String[] args) throws InterruptedException, IOException {
-
-    final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
-
-    if (modus.equals("LoadGenerator")) {
-      LoadGenerator.main(args);
-    } else if (modus.equals("LoadGeneratorExtrem")) {
-      LoadGeneratorExtrem.main(args);
-    } else if (modus.equals("LoadCounter")) {
-      LoadCounter.main(args);
-    }
-
-  }
-}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java
deleted file mode 100644
index 798f30144..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package titan.ccp.kiekerbridge.expbigdata19;
-
-import com.google.common.math.StatsAccumulator;
-import java.time.Duration;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.Deserializer;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-import titan.ccp.models.records.AggregatedActivePowerRecord;
-import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
-
-public class LoadCounter {
-
-  public static void main(final String[] args) throws InterruptedException {
-
-    final String kafkaBootstrapServers =
-        Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
-    final String kafkaInputTopic =
-        Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
-    final String kafkaOutputTopic =
-        Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
-
-    final Properties props = new Properties();
-    props.setProperty("bootstrap.servers", kafkaBootstrapServers);
-    props.setProperty("group.id", "load-counter");
-    props.setProperty("enable.auto.commit", "true");
-    props.setProperty("auto.commit.interval.ms", "1000");
-    props.setProperty("max.poll.records", "1000000");
-    props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
-    props.setProperty("key.deserializer",
-        "org.apache.kafka.common.serialization.StringDeserializer");
-    props.setProperty("value.deserializer",
-        "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-    final Deserializer<AggregatedActivePowerRecord> deserializer =
-        IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
-
-    final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
-    consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
-
-    executor.scheduleAtFixedRate(
-        () -> {
-          final long time = System.currentTimeMillis();
-          final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
-
-          long inputCount = 0;
-          for (final ConsumerRecord<String, byte[]> inputRecord : records
-              .records(kafkaInputTopic)) {
-            inputCount++;
-          }
-
-          long outputCount = 0;
-          final StatsAccumulator statsAccumulator = new StatsAccumulator();
-          for (final ConsumerRecord<String, byte[]> outputRecord : records
-              .records(kafkaOutputTopic)) {
-            outputCount++;
-            final AggregatedActivePowerRecord record =
-                deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
-            final long latency = time - record.getTimestamp();
-            statsAccumulator.add(latency);
-          }
-
-          final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
-
-          final long elapsedTime = System.currentTimeMillis() - time;
-          System.out
-              .println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
-          System.out
-              .println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
-        },
-        0,
-        1,
-        TimeUnit.SECONDS);
-  }
-
-}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java
deleted file mode 100644
index 97a7c84f8..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package titan.ccp.kiekerbridge.expbigdata19;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import titan.ccp.configuration.events.Event;
-import titan.ccp.kiekerbridge.KafkaRecordSender;
-import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
-import titan.ccp.model.sensorregistry.MutableSensorRegistry;
-import titan.ccp.models.records.ActivePowerRecord;
-
-public class LoadGenerator {
-
-  public static void main(final String[] args) throws InterruptedException, IOException {
-
-    final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
-    final int numNestedGroups =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
-    final int numSensor =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
-    final int periodMs =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
-    final int value =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
-    final boolean sendRegistry =
-        Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
-    final int threads =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
-    final String kafkaBootstrapServers =
-        Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
-    final String kafkaInputTopic =
-        Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
-    final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
-    final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
-    final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
-
-    final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
-    if (hierarchy.equals("deep")) {
-      MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
-      for (int lvl = 1; lvl < numNestedGroups; lvl++) {
-        lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
-      }
-      for (int s = 0; s < numSensor; s++) {
-        lastSensor.addChildMachineSensor("sensor_" + s);
-      }
-    } else if (hierarchy.equals("full")) {
-      addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
-    } else {
-      throw new IllegalStateException();
-    }
-
-    final List<String> sensors =
-        sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
-            .collect(Collectors.toList());
-
-    if (sendRegistry) {
-      final ConfigPublisher configPublisher =
-          new ConfigPublisher(kafkaBootstrapServers, "configuration");
-      configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
-      configPublisher.close();
-      System.out.println("Configuration sent.");
-
-      System.out.println("Now wait 30 seconds");
-      Thread.sleep(30_000);
-      System.out.println("And woke up again :)");
-    }
-
-
-    final Properties kafkaProperties = new Properties();
-    // kafkaProperties.put("acks", this.acknowledges);
-    kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
-    kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
-    kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
-    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
-        kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
-        kafkaProperties);
-
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
-
-    for (final String sensor : sensors) {
-      final int initialDelay = random.nextInt(periodMs);
-      executor.scheduleAtFixedRate(
-          () -> {
-            kafkaRecordSender.write(new ActivePowerRecord(
-                sensor,
-                System.currentTimeMillis(),
-                value));
-          },
-          initialDelay,
-          periodMs,
-          TimeUnit.MILLISECONDS);
-    }
-
-    System.out.println("Wait for termination...");
-    executor.awaitTermination(30, TimeUnit.DAYS);
-    System.out.println("Will terminate now");
-
-  }
-
-  private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
-      final int lvl, final int maxLvl, int nextId) {
-    for (int c = 0; c < numChildren; c++) {
-      if (lvl == maxLvl) {
-        parent.addChildMachineSensor("s_" + nextId);
-        nextId++;
-      } else {
-        final MutableAggregatedSensor newParent =
-            parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
-        nextId++;
-        nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
-      }
-    }
-    return nextId;
-  }
-
-}
diff --git a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java b/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java
deleted file mode 100644
index 5bfb6ad48..000000000
--- a/uc4-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java
+++ /dev/null
@@ -1,165 +0,0 @@
-package titan.ccp.kiekerbridge.expbigdata19;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import titan.ccp.configuration.events.Event;
-import titan.ccp.kiekerbridge.KafkaRecordSender;
-import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
-import titan.ccp.model.sensorregistry.MutableSensorRegistry;
-import titan.ccp.model.sensorregistry.SensorRegistry;
-import titan.ccp.models.records.ActivePowerRecord;
-
-public class LoadGeneratorExtrem {
-
-  public static void main(final String[] args) throws InterruptedException, IOException {
-
-    final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
-    final int numNestedGroups =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
-    final int numSensor =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
-    final int value =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
-    final boolean sendRegistry =
-        Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
-    final boolean doNothing =
-        Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false"));
-    final int threads =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
-    final int producers =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1"));
-    final String kafkaBootstrapServers =
-        Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
-    final String kafkaInputTopic =
-        Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
-    final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
-    final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
-    final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
-
-    final SensorRegistry sensorRegistry =
-        buildSensorRegistry(hierarchy, numNestedGroups, numSensor);
-
-    if (sendRegistry) {
-      final ConfigPublisher configPublisher =
-          new ConfigPublisher(kafkaBootstrapServers, "configuration");
-      configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
-      configPublisher.close();
-      System.out.println("Configuration sent.");
-
-      System.out.println("Now wait 30 seconds");
-      Thread.sleep(30_000);
-      System.out.println("And woke up again :)");
-    }
-
-    final Properties kafkaProperties = new Properties();
-    // kafkaProperties.put("acks", this.acknowledges);
-    kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
-    kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
-    kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
-    final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream
-        .<KafkaRecordSender<ActivePowerRecord>>generate(
-            () -> new KafkaRecordSender<>(
-                kafkaBootstrapServers,
-                kafkaInputTopic,
-                r -> r.getIdentifier(),
-                r -> r.getTimestamp(),
-                kafkaProperties))
-        .limit(producers)
-        .collect(Collectors.toList());
-
-    final List<String> sensors =
-        sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
-            .collect(Collectors.toList());
-
-    for (int i = 0; i < threads; i++) {
-      final int threadId = i;
-      new Thread(() -> {
-        while (true) {
-          for (final String sensor : sensors) {
-            if (!doNothing) {
-              kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord(
-                  sensor,
-                  System.currentTimeMillis(),
-                  value));
-            }
-          }
-        }
-      }).start();
-    }
-
-    while (true) {
-      printCpuUsagePerThread();
-    }
-
-    // System.out.println("Wait for termination...");
-    // Thread.sleep(30 * 24 * 60 * 60 * 1000L);
-    // System.out.println("Will terminate now");
-  }
-
-  private static void printCpuUsagePerThread() throws InterruptedException {
-    final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
-    final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet());
-
-    final long start = System.nanoTime();
-    final long[] startCpuTimes = new long[threads.size()];
-    for (int i = 0; i < threads.size(); i++) {
-      final Thread thread = threads.get(i);
-      startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId());
-    }
-
-    Thread.sleep(5000);
-
-    for (int i = 0; i < threads.size(); i++) {
-      final Thread thread = threads.get(i);
-      final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i];
-      final long dur = System.nanoTime() - start;
-      final double util = (double) cpuTime / dur;
-      System.out.println(
-          "Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util));
-    }
-  }
-
-  private static SensorRegistry buildSensorRegistry(final String hierarchy,
-      final int numNestedGroups, final int numSensor) {
-    final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
-    if (hierarchy.equals("deep")) {
-      MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
-      for (int lvl = 1; lvl < numNestedGroups; lvl++) {
-        lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
-      }
-      for (int s = 0; s < numSensor; s++) {
-        lastSensor.addChildMachineSensor("sensor_" + s);
-      }
-    } else if (hierarchy.equals("full")) {
-      addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
-    } else {
-      throw new IllegalStateException();
-    }
-    return sensorRegistry;
-  }
-
-  private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
-      final int lvl, final int maxLvl, int nextId) {
-    for (int c = 0; c < numChildren; c++) {
-      if (lvl == maxLvl) {
-        parent.addChildMachineSensor("s_" + nextId);
-        nextId++;
-      } else {
-        final MutableAggregatedSensor newParent =
-            parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
-        nextId++;
-        nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
-      }
-    }
-    return nextId;
-  }
-
-}
diff --git a/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java b/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java
new file mode 100644
index 000000000..b7468ed09
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java
@@ -0,0 +1,24 @@
+package uc4.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  public static final String NUM_THREADS = "num.threads";
+
+  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+  public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
+
+  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/uc4-application/src/main/java/uc4/application/HistoryService.java b/uc4-application/src/main/java/uc4/application/HistoryService.java
new file mode 100644
index 000000000..8e6020b58
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/application/HistoryService.java
@@ -0,0 +1,55 @@
+package uc4.application;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.streams.KafkaStreams;
+import titan.ccp.common.configuration.Configurations;
+import uc4.streamprocessing.KafkaStreamsBuilder;
+
+/**
+ * A microservice that manages the history and, therefore, stores and aggregates incoming
+ * measurements.
+ *
+ */
+public class HistoryService {
+
+  private final Configuration config = Configurations.create();
+
+  private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
+  final int KAFKA_WINDOW_DURATION_MINUTES = Integer
+      .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
+
+  /**
+   * Start the service.
+   *
+   * @return {@link CompletableFuture} which is completed when the service is successfully started.
+   */
+  public void run() {
+    this.createKafkaStreamsApplication();
+  }
+
+  /**
+   * Build and start the underlying Kafka Streams application of the service.
+   *
+   */
+  private void createKafkaStreamsApplication() {
+    final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
+        .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
+        .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
+        .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
+        .windowDuration(Duration.ofMinutes(this.KAFKA_WINDOW_DURATION_MINUTES))
+        .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
+        .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
+        .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
+        .build();
+    this.stopEvent.thenRun(kafkaStreams::close);
+    kafkaStreams.start();
+  }
+
+  public static void main(final String[] args) {
+    new HistoryService().run();
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKey.java b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKey.java
new file mode 100644
index 000000000..2b9cc8596
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKey.java
@@ -0,0 +1,31 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+
+/**
+ * Composed key of a {@link DayOfWeek} and a sensor id.
+ */
+public class DayOfWeekKey {
+
+  private final DayOfWeek dayOfWeek;
+  private final String sensorId;
+
+  public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) {
+    this.dayOfWeek = dayOfWeek;
+    this.sensorId = sensorId;
+  }
+
+  public DayOfWeek getDayOfWeek() {
+    return this.dayOfWeek;
+  }
+
+  public String getSensorId() {
+    return this.sensorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.sensorId + ";" + this.dayOfWeek.toString();
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeyFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeyFactory.java
new file mode 100644
index 000000000..2d3f44589
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeyFactory.java
@@ -0,0 +1,22 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+import java.time.LocalDateTime;
+
+/**
+ * {@link StatsKeyFactory} for {@link DayOfWeekKey}.
+ */
+public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> {
+
+  @Override
+  public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
+    final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
+    return new DayOfWeekKey(dayOfWeek, sensorId);
+  }
+
+  @Override
+  public String getSensorId(final DayOfWeekKey key) {
+    return key.getSensorId();
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeySerde.java b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeySerde.java
new file mode 100644
index 000000000..282a9f579
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekKeySerde.java
@@ -0,0 +1,33 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+/**
+ * {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new
+ * Kafka {@link Serde}.
+ */
+public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> {
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) {
+    buffer.putInt(data.getDayOfWeek().getValue());
+    buffer.putString(data.getSensorId());
+  }
+
+  @Override
+  public DayOfWeekKey deserialize(final ReadBuffer buffer) {
+    final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
+    final String sensorId = buffer.getString();
+    return new DayOfWeekKey(dayOfWeek, sensorId);
+  }
+
+  public static Serde<DayOfWeekKey> create() {
+    return SimpleSerdes.create(new DayOfWeekKeySerde());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekRecordFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekRecordFactory.java
new file mode 100644
index 000000000..da984eace
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/DayOfWeekRecordFactory.java
@@ -0,0 +1,28 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import org.apache.kafka.streams.kstream.Windowed;
+import titan.ccp.model.records.DayOfWeekActivePowerRecord;
+
+/**
+ * {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}.
+ */
+public class DayOfWeekRecordFactory
+    implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> {
+
+  @Override
+  public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed,
+      final Stats stats) {
+    return new DayOfWeekActivePowerRecord(
+        windowed.key().getSensorId(),
+        windowed.key().getDayOfWeek().getValue(),
+        windowed.window().start(),
+        windowed.window().end(),
+        stats.count(),
+        stats.mean(),
+        stats.populationVariance(),
+        stats.min(),
+        stats.max());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKey.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKey.java
new file mode 100644
index 000000000..2bf346064
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKey.java
@@ -0,0 +1,29 @@
+package uc4.streamprocessing;
+
+/**
+ * Composed key of an hour of the day and a sensor id.
+ */
+public class HourOfDayKey {
+
+  private final int hourOfDay;
+  private final String sensorId;
+
+  public HourOfDayKey(final int hourOfDay, final String sensorId) {
+    this.hourOfDay = hourOfDay;
+    this.sensorId = sensorId;
+  }
+
+  public int getHourOfDay() {
+    return this.hourOfDay;
+  }
+
+  public String getSensorId() {
+    return this.sensorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.sensorId + ";" + this.hourOfDay;
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeyFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeyFactory.java
new file mode 100644
index 000000000..641a314e7
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeyFactory.java
@@ -0,0 +1,21 @@
+package uc4.streamprocessing;
+
+import java.time.LocalDateTime;
+
+/**
+ * {@link StatsKeyFactory} for {@link HourOfDayKey}.
+ */
+public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
+
+  @Override
+  public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
+    final int hourOfDay = dateTime.getHour();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  @Override
+  public String getSensorId(final HourOfDayKey key) {
+    return key.getSensorId();
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeySerde.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeySerde.java
new file mode 100644
index 000000000..93965b3e8
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayKeySerde.java
@@ -0,0 +1,32 @@
+package uc4.streamprocessing;
+
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+/**
+ * {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new
+ * Kafka {@link Serde}.
+ */
+public class HourOfDayKeySerde implements BufferSerde<HourOfDayKey> {
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
+    buffer.putInt(data.getHourOfDay());
+    buffer.putString(data.getSensorId());
+  }
+
+  @Override
+  public HourOfDayKey deserialize(final ReadBuffer buffer) {
+    final int hourOfDay = buffer.getInt();
+    final String sensorId = buffer.getString();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  public static Serde<HourOfDayKey> create() {
+    return SimpleSerdes.create(new HourOfDayKeySerde());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayRecordFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayRecordFactory.java
new file mode 100644
index 000000000..15710bfa7
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfDayRecordFactory.java
@@ -0,0 +1,28 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import org.apache.kafka.streams.kstream.Windowed;
+import titan.ccp.model.records.HourOfDayActivePowerRecord;
+
+/**
+ * {@link StatsRecordFactory} to create an {@link HourOfDayActivePowerRecord}.
+ */
+public class HourOfDayRecordFactory
+    implements StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> {
+
+  @Override
+  public HourOfDayActivePowerRecord create(final Windowed<HourOfDayKey> windowed,
+      final Stats stats) {
+    return new HourOfDayActivePowerRecord(
+        windowed.key().getSensorId(),
+        windowed.key().getHourOfDay(),
+        windowed.window().start(),
+        windowed.window().end(),
+        stats.count(),
+        stats.mean(),
+        stats.populationVariance(),
+        stats.min(),
+        stats.max());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKey.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKey.java
new file mode 100644
index 000000000..b007514a8
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKey.java
@@ -0,0 +1,40 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+
+/**
+ * Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id.
+ */
+public class HourOfWeekKey {
+
+  private final DayOfWeek dayOfWeek;
+  private final int hourOfDay;
+  private final String sensorId;
+
+  /**
+   * Create a new {@link HourOfDayKey} using its components.
+   */
+  public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) {
+    this.dayOfWeek = dayOfWeek;
+    this.hourOfDay = hourOfDay;
+    this.sensorId = sensorId;
+  }
+
+  public DayOfWeek getDayOfWeek() {
+    return this.dayOfWeek;
+  }
+
+  public int getHourOfDay() {
+    return this.hourOfDay;
+  }
+
+  public String getSensorId() {
+    return this.sensorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay;
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeyFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeyFactory.java
new file mode 100644
index 000000000..59c044139
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeyFactory.java
@@ -0,0 +1,23 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+import java.time.LocalDateTime;
+
+/**
+ * {@link StatsKeyFactory} for {@link HourOfWeekKey}.
+ */
+public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> {
+
+  @Override
+  public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
+    final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
+    final int hourOfDay = dateTime.getHour();
+    return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
+  }
+
+  @Override
+  public String getSensorId(final HourOfWeekKey key) {
+    return key.getSensorId();
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeySerde.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeySerde.java
new file mode 100644
index 000000000..2dbe18b93
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekKeySerde.java
@@ -0,0 +1,35 @@
+package uc4.streamprocessing;
+
+import java.time.DayOfWeek;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+/**
+ * {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new
+ * Kafka {@link Serde}.
+ */
+public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> {
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) {
+    buffer.putInt(data.getDayOfWeek().getValue());
+    buffer.putInt(data.getHourOfDay());
+    buffer.putString(data.getSensorId());
+  }
+
+  @Override
+  public HourOfWeekKey deserialize(final ReadBuffer buffer) {
+    final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
+    final int hourOfDay = buffer.getInt();
+    final String sensorId = buffer.getString();
+    return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
+  }
+
+  public static Serde<HourOfWeekKey> create() {
+    return SimpleSerdes.create(new HourOfWeekKeySerde());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekRecordFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekRecordFactory.java
new file mode 100644
index 000000000..7f3b66344
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/HourOfWeekRecordFactory.java
@@ -0,0 +1,29 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import org.apache.kafka.streams.kstream.Windowed;
+import titan.ccp.model.records.HourOfWeekActivePowerRecord;
+
+/**
+ * {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}.
+ */
+public class HourOfWeekRecordFactory
+    implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> {
+
+  @Override
+  public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed,
+      final Stats stats) {
+    return new HourOfWeekActivePowerRecord(
+        windowed.key().getSensorId(),
+        windowed.key().getDayOfWeek().getValue(),
+        windowed.key().getHourOfDay(),
+        windowed.window().start(),
+        windowed.window().end(),
+        stats.count(),
+        stats.mean(),
+        stats.populationVariance(),
+        stats.min(),
+        stats.max());
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
new file mode 100644
index 000000000..c69f8bca2
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java
@@ -0,0 +1,104 @@
+package uc4.streamprocessing;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import titan.ccp.common.kafka.streams.PropertiesBuilder;
+
+/**
+ * Builder for the Kafka Streams configuration.
+ */
+public class KafkaStreamsBuilder {
+
+  private static final String APPLICATION_NAME = "titan-ccp-history";
+  private static final String APPLICATION_VERSION = "0.0.1";
+
+  // private static final Logger LOGGER =
+  // LoggerFactory.getLogger(KafkaStreamsBuilder.class);
+
+  private String bootstrapServers; // NOPMD
+  private String inputTopic; // NOPMD
+  private String outputTopic; // NOPMD
+  private Duration windowDuration; // NOPMD
+  private int numThreads = -1; // NOPMD
+  private int commitIntervalMs = -1; // NOPMD
+  private int cacheMaxBytesBuff = -1; // NOPMD
+
+  public KafkaStreamsBuilder inputTopic(final String inputTopic) {
+    this.inputTopic = inputTopic;
+    return this;
+  }
+
+  public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
+    this.bootstrapServers = bootstrapServers;
+    return this;
+  }
+
+  public KafkaStreamsBuilder outputTopic(final String outputTopic) {
+    this.outputTopic = outputTopic;
+    return this;
+  }
+
+  public KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
+    this.windowDuration = windowDuration;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
+   * one for using the default.
+   */
+  public KafkaStreamsBuilder numThreads(final int numThreads) {
+    if (numThreads < -1 || numThreads == 0) {
+      throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
+    }
+    this.numThreads = numThreads;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for the frequency with which to save the position (offsets in
+   * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   */
+  public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
+    if (commitIntervalMs < -1) {
+      throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
+    }
+    this.commitIntervalMs = commitIntervalMs;
+    return this;
+  }
+
+  /**
+   * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
+   * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
+   * example, when processing bulks of records. Can be minus one for using the default.
+   */
+  public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
+    if (cacheMaxBytesBuffering < -1) {
+      throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
+    }
+    this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
+    return this;
+  }
+
+  /**
+   * Builds the {@link KafkaStreams} instance.
+   */
+  public KafkaStreams build() {
+    Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
+    // TODO log parameters
+    final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
+        this.windowDuration, null);
+    final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
+        .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
+        .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
+        .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
+        .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
+        .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG").build();
+    return new KafkaStreams(topologyBuilder.build(), properties);
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/RecordDatabaseAdapter.java b/uc4-application/src/main/java/uc4/streamprocessing/RecordDatabaseAdapter.java
new file mode 100644
index 000000000..d2230abd2
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/RecordDatabaseAdapter.java
@@ -0,0 +1,85 @@
+package uc4.streamprocessing;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.avro.specific.SpecificRecord;
+
+/**
+ * Holds the property names for a statistics record (which is an Avro record).
+ *
+ * @param <T> Record type this adapter is for.
+ */
+public class RecordDatabaseAdapter<T extends SpecificRecord> {
+
+  private static final String DEFAULT_IDENTIFIER_FIELD = "identifier";
+  private static final String DEFAULT_PERIOD_START_FIELD = "periodStart";
+  private static final String DEFAULT_PERIOD_END_FIELD = "periodEnd";
+
+  private final Class<? extends T> clazz;
+  private final String identifierField;
+  private final Collection<String> timeUnitFields;
+  private final String periodStartField;
+  private final String periodEndField;
+
+  /**
+   * Create a new {@link RecordDatabaseAdapter} for the given record type by setting its time unit
+   * property (e.g., day of week or hour of day) and default fields for the other properties.
+   */
+  public RecordDatabaseAdapter(final Class<? extends T> clazz, final String timeUnitField) {
+    this(clazz,
+        DEFAULT_IDENTIFIER_FIELD,
+        List.of(timeUnitField),
+        DEFAULT_PERIOD_START_FIELD,
+        DEFAULT_PERIOD_END_FIELD);
+  }
+
+  /**
+   * Create a new {@link RecordDatabaseAdapter} for the given record type by setting its time unit
+   * properties (e.g., day of week and hour of day) and default fields for the other properties.
+   */
+  public RecordDatabaseAdapter(final Class<? extends T> clazz,
+      final Collection<String> timeUnitFields) {
+    this(clazz,
+        DEFAULT_IDENTIFIER_FIELD,
+        timeUnitFields,
+        DEFAULT_PERIOD_START_FIELD,
+        DEFAULT_PERIOD_END_FIELD);
+  }
+
+  /**
+   * Create a new {@link RecordDatabaseAdapter} for the given record type by setting all its
+   * required properties.
+   */
+  public RecordDatabaseAdapter(final Class<? extends T> clazz,
+      final String identifierField,
+      final Collection<String> timeUnitField,
+      final String periodStartField,
+      final String periodEndField) {
+    this.clazz = clazz;
+    this.identifierField = identifierField;
+    this.timeUnitFields = timeUnitField;
+    this.periodStartField = periodStartField;
+    this.periodEndField = periodEndField;
+  }
+
+  public Class<? extends T> getClazz() {
+    return this.clazz;
+  }
+
+  public String getIdentifierField() {
+    return this.identifierField;
+  }
+
+  public Collection<String> getTimeUnitFields() {
+    return this.timeUnitFields;
+  }
+
+  public String getPeriodStartField() {
+    return this.periodStartField;
+  }
+
+  public String getPeriodEndField() {
+    return this.periodEndField;
+  }
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java
new file mode 100644
index 000000000..dd7079734
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java
@@ -0,0 +1,45 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.GenericSerde;
+import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+import titan.ccp.model.records.WindowedActivePowerRecord;
+
+final class Serdes {
+
+  private final SchemaRegistryAvroSerdeFactory avroSerdeFactory;
+
+  public Serdes(final String schemaRegistryUrl) {
+    this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
+  }
+
+  public Serde<String> string() {
+    return org.apache.kafka.common.serialization.Serdes.String();
+  }
+
+  public Serde<WindowedActivePowerRecord> windowedActivePowerValues() {
+    return this.avroSerdeFactory.forKeys();
+  }
+
+  public Serde<ActivePowerRecord> activePowerRecordValues() {
+    return this.avroSerdeFactory.forValues();
+  }
+
+  public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() {
+    return this.avroSerdeFactory.forValues();
+  }
+
+  public <T extends SpecificRecord> Serde<T> avroValues() {
+    return this.avroSerdeFactory.forValues();
+  }
+
+  public Serde<Stats> stats() {
+    return GenericSerde.from(Stats::toByteArray, Stats::fromByteArray);
+  }
+
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/StatsKeyFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/StatsKeyFactory.java
new file mode 100644
index 000000000..ab614e32a
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/StatsKeyFactory.java
@@ -0,0 +1,17 @@
+package uc4.streamprocessing;
+
+import java.time.LocalDateTime;
+
+/**
+ * Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object
+ * and vice versa.
+ *
+ * @param <T> Type of the key
+ */
+public interface StatsKeyFactory<T> {
+
+  T createKey(String sensorId, LocalDateTime dateTime);
+
+  String getSensorId(T key);
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/StatsRecordFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/StatsRecordFactory.java
new file mode 100644
index 000000000..bd63a26fd
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/StatsRecordFactory.java
@@ -0,0 +1,22 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+/**
+ * Factory interface for creating a stats Avro record from a {@link Windowed} and a {@link Stats}.
+ * The {@link Windowed} contains about information about the start end end of the {@link Window} as
+ * well as the sensor id and the aggregated time unit. The {@link Stats} objects contains the actual
+ * aggregation results.
+ *
+ * @param <K> Key type of the {@link Windowed}
+ * @param <R> Avro record type
+ */
+@FunctionalInterface
+public interface StatsRecordFactory<K, R extends SpecificRecord> {
+
+  R create(Windowed<K> windowed, Stats stats);
+
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
new file mode 100644
index 000000000..181514bb6
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java
@@ -0,0 +1,99 @@
+package uc4.streamprocessing;
+
+import com.google.common.math.Stats;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.HourOfDayActivePowerRecord;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+import uc4.streamprocessing.util.StatsFactory;
+
+/**
+ * Builds Kafka Stream Topology for the History microservice.
+ */
+public class TopologyBuilder {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+
+  private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter
+
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final Serdes serdes;
+
+  private final StreamsBuilder builder = new StreamsBuilder();
+
+  /**
+   * Create a new {@link TopologyBuilder} using the given topics.
+   */
+  public TopologyBuilder(final String inputTopic, final String outputTopic,
+      final Duration duration, final String schemaRegistryUrl) {
+    this.inputTopic = inputTopic;
+    this.outputTopic = outputTopic;
+    this.serdes = new Serdes(schemaRegistryUrl);
+  }
+
+  /**
+   * Build the {@link Topology} for the History microservice.
+   */
+  public Topology build() {
+
+    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+    final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
+    final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory =
+        new HourOfDayRecordFactory();
+    final TimeWindows timeWindows =
+        TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1));
+    final String statsTopic = "output";
+
+    this.builder
+        .stream(this.inputTopic,
+            Consumed.with(this.serdes.string(),
+                IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
+        .mapValues(kieker -> new ActivePowerRecord(
+            kieker.getIdentifier(),
+            kieker.getTimestamp(),
+            kieker.getValueInW()))
+        .selectKey((key, value) -> {
+          final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
+          final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
+          return keyFactory.createKey(value.getIdentifier(), dateTime);
+        })
+        .groupByKey(Grouped.with(keySerde, this.serdes.activePowerRecordValues()))
+        .windowedBy(timeWindows)
+        .aggregate(
+            () -> Stats.of(),
+            (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
+            Materialized.with(keySerde, this.serdes.stats()))
+        .toStream()
+        .map((key, stats) -> KeyValue.pair(
+            keyFactory.getSensorId(key.key()),
+            stats.toString()))
+        // TODO
+        // statsRecordFactory.create(key, value)))
+        // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
+        .to(
+            statsTopic,
+            Produced.with(
+                this.serdes.string(),
+                this.serdes.string()));
+    // this.serdes.avroValues()));
+
+    return this.builder.build();
+  }
+}
diff --git a/uc4-application/src/main/java/uc4/streamprocessing/util/StatsFactory.java b/uc4-application/src/main/java/uc4/streamprocessing/util/StatsFactory.java
new file mode 100644
index 000000000..60035ea7f
--- /dev/null
+++ b/uc4-application/src/main/java/uc4/streamprocessing/util/StatsFactory.java
@@ -0,0 +1,23 @@
+package uc4.streamprocessing.util;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+
+/**
+ * Factory methods for working with {@link Stats}.
+ */
+public final class StatsFactory {
+
+  private StatsFactory() {}
+
+  /**
+   * Add a value to a {@link Stats} object.
+   */
+  public static Stats accumulate(final Stats stats, final double value) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(stats);
+    statsAccumulator.add(value);
+    return statsAccumulator.snapshot();
+  }
+
+}
diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties
new file mode 100644
index 000000000..d2002fd1c
--- /dev/null
+++ b/uc4-application/src/main/resources/META-INF/application.properties
@@ -0,0 +1,6 @@
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+num.threads=1
+commit.interval.ms=10
+cache.max.bytes.buffering=-1
-- 
GitLab