diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index d56cd224a15f6c43e3893dcf5df3b8322c78dae8..ad1cb4f4c182a8f23adef80b9f75f9ef4ea8eb1f 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -14,7 +14,7 @@ include 'uc2-application-flink' // TODO Rename to uc2-flink
 
 include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
 include 'uc3-application' // TODO Rename to uc1-kstreams
-//include 'uc3-application-flink' // TODO Rename to uc3-flink
+include 'uc3-application-flink' // TODO Rename to uc3-flink
 
 include 'uc4-workload-generator' // TODO Rename to uc4-load-generator
 include 'uc4-application' // TODO Rename to uc4-kstreams
diff --git a/theodolite-benchmarks/uc3-application-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-application-flink/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..4e04e2891754324a6e1bf55348b6a38f592bb301
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=false
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=15
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=true
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
diff --git a/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/uc3-application-flink/Dockerfile b/theodolite-benchmarks/uc3-application-flink/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..541033dfbf1db97d71963cfc5ec99f8efa300933
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/Dockerfile
@@ -0,0 +1,3 @@
+FROM nicobiernat/flink:1.11-scala_2.12-java_11
+
+ADD build/libs/uc3-application-all.jar /opt/flink/usrlib/artifacts/uc3-application-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-application-flink/build.gradle b/theodolite-benchmarks/uc3-application-flink/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..d50fa8efecd9b17e387d00c71934b8cc144240a1
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/build.gradle
@@ -0,0 +1,13 @@
+allprojects {
+	repositories {
+    	maven {
+    		url 'https://packages.confluent.io/maven/'
+    	}
+	}
+}
+
+dependencies {
+    compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
+}
+
+mainClassName = "theodolite.uc3.application.HistoryServiceFlinkJob"
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..1604ec73e0313f6c022985ffff969597baefd737
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/ConfigurationKeys.java
@@ -0,0 +1,36 @@
+package theodolite.uc3.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String APPLICATION_NAME = "application.name";
+
+  public static final String APPLICATION_VERSION = "application.version";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+  public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
+
+  public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
+
+  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+  public static final String TIME_ZONE = "time.zone";
+
+  public static final String FLINK_STATE_BACKEND = "flink.state.backend";
+
+  public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
+
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size";
+
+  public static final String CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
new file mode 100644
index 0000000000000000000000000000000000000000..c3f4fd21d10bc73421f16182e8f20fcc8988c47b
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -0,0 +1,180 @@
+package theodolite.uc3.application;
+
+import com.google.common.math.Stats;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
+import theodolite.commons.flink.serialization.StatsSerializer;
+import theodolite.uc3.application.util.HourOfDayKey;
+import theodolite.uc3.application.util.HourOfDayKeyFactory;
+import theodolite.uc3.application.util.HourOfDayKeySerde;
+import theodolite.uc3.application.util.StatsKeyFactory;
+import titan.ccp.common.configuration.Configurations;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Properties;
+
+
+/**
+ * The History Microservice Flink Job.
+ */
+public class HistoryServiceFlinkJob {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
+
+  private final Configuration config = Configurations.create();
+
+  private void run() {
+    // Configurations
+    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    final String applicationId = applicationName + "-" + applicationVersion;
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE);
+    final ZoneId timeZone = ZoneId.of(timeZoneString);
+    final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
+    final Time aggregationAdvance = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
+    final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
+    final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
+    final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+    final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+
+    final Properties kafkaProps = new Properties();
+    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
+    kafkaProps.setProperty("group.id", applicationId);
+
+    // Sources and Sinks with Serializer and Deserializer
+
+    final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde =
+        new FlinkMonitoringRecordSerde<>(
+            inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class);
+
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
+        inputTopic, sourceSerde, kafkaProps);
+
+    kafkaSource.setStartFromGroupOffsets();
+    if (checkpointing)
+      kafkaSource.setCommitOffsetsOnCheckpoints(true);
+    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
+
+    final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
+        new FlinkKafkaKeyValueSerde<>(outputTopic,
+            Serdes::String,
+            Serdes::String,
+            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})
+        );
+
+    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
+        outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
+    kafkaSink.setWriteTimestampToKafka(true);
+
+    // Execution environment configuration
+
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    if (checkpointing)
+      env.enableCheckpointing(commitIntervalMs);
+
+    // State Backend
+    if (stateBackend.equals("filesystem")) {
+      env.setStateBackend(new FsStateBackend(stateBackendPath));
+    } else if (stateBackend.equals("rocksdb")) {
+      try {
+        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    } else {
+      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
+    }
+
+    // Kryo serializer registration
+    env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
+    env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class,
+        new FlinkMonitoringRecordSerde<>(
+            inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class));
+    env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+
+    env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) ->
+        LOGGER.info("Class " + c.getName() + " registered with serializer "
+            + s.getSerializer().getClass().getName()));
+
+    // Streaming topology
+
+    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+
+    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic);
+
+    stream
+        .rebalance()
+        .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
+          final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
+          final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
+          return keyFactory.createKey(record.getIdentifier(), dateTime);
+        })
+        .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
+        .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
+        .map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() {
+          @Override
+          public Tuple2<String, String> map(Tuple2<HourOfDayKey, Stats> tuple) {
+            final String newKey = keyFactory.getSensorId(tuple.f0);
+            final String newValue = tuple.f1.toString();
+            final int hourOfDay = tuple.f0.getHourOfDay();
+            LOGGER.info(newKey + "|" + hourOfDay + ": " + newValue);
+            return new Tuple2<>(newKey, newValue);
+          }
+        }).name("map")
+        .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
+
+    // Execution plan
+
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info("Execution Plan: " + env.getExecutionPlan());
+    }
+
+    // Execute Job
+
+    try {
+      env.execute(applicationId);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(final String[] args) {
+    new HistoryServiceFlinkJob().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..d5edf2dce68b787e1d6d7df72251211691c4fabf
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java
@@ -0,0 +1,21 @@
+package theodolite.uc3.application;
+
+import com.google.common.math.Stats;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import theodolite.uc3.application.util.HourOfDayKey;
+
+public class HourOfDayProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> {
+
+  @Override
+  public void process(final HourOfDayKey hourOfDayKey,
+                      final Context context,
+                      final Iterable<Stats> elements,
+                      final Collector<Tuple2<HourOfDayKey, Stats>> out) {
+    final Stats stats = elements.iterator().next();
+    out.collect(new Tuple2<>(hourOfDayKey, stats));
+  }
+
+}
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..65af4f4511e53fa44ae1ab43c2aaac48e5e251fa
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/StatsAggregateFunction.java
@@ -0,0 +1,39 @@
+package theodolite.uc3.application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import theodolite.uc3.application.util.StatsFactory;
+import titan.ccp.models.records.ActivePowerRecord;
+
+/**
+ * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
+
+  private static final long serialVersionUID = -8873572990921515499L;
+
+  @Override
+  public Stats createAccumulator() {
+    return Stats.of();
+  }
+
+  @Override
+  public Stats add(final ActivePowerRecord value, final Stats accumulator) {
+    return StatsFactory.accumulate(accumulator, value.getValueInW());
+  }
+
+  @Override
+  public Stats getResult(final Stats accumulator) {
+    return accumulator;
+  }
+
+  @Override
+  public Stats merge(final Stats a, final Stats b) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(a);
+    statsAccumulator.addAll(b);
+    return statsAccumulator.snapshot();
+  }
+}
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java
new file mode 100644
index 0000000000000000000000000000000000000000..ae9559b9601be857e7472b85fa3d35eb06311867
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java
@@ -0,0 +1,67 @@
+package theodolite.uc3.application.util;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Composed key of an hour of the day and a sensor id.
+ */
+public class HourOfDayKey {
+
+  private final int hourOfDay;
+  private final String sensorId;
+
+  public HourOfDayKey(final int hourOfDay, final String sensorId) {
+    this.hourOfDay = hourOfDay;
+    this.sensorId = sensorId;
+  }
+
+  public int getHourOfDay() {
+    return this.hourOfDay;
+  }
+
+  public String getSensorId() {
+    return this.sensorId;
+  }
+
+  @Override
+  public String toString() {
+    return this.sensorId + ";" + this.hourOfDay;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(hourOfDay, sensorId);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) return true;
+    if (!(obj instanceof HourOfDayKey)) return false;
+    final HourOfDayKey k = (HourOfDayKey) obj;
+    return hourOfDay == k.hourOfDay && sensorId.equals(k.sensorId);
+  }
+
+  public byte[] toByteArray() {
+    final int numBytes = (2 * Integer.SIZE + this.sensorId.length() * Character.SIZE) / Byte.SIZE;
+    final ByteBuffer buffer = ByteBuffer.allocate(numBytes).order(ByteOrder.LITTLE_ENDIAN);
+    buffer.putInt(this.hourOfDay);
+    buffer.putInt(this.sensorId.length());
+    for (final char c : this.sensorId.toCharArray()) {
+      buffer.putChar(c);
+    }
+    return buffer.array();
+  }
+
+  public static HourOfDayKey fromByteArray(final byte[] bytes) {
+    final ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+    final int hourOfDay = buffer.getInt();
+    final int strLen = buffer.getInt();
+    final char[] sensorId = new char[strLen];
+    for (int i = 0; i < strLen; i++) {
+      sensorId[i] = buffer.getChar();
+    }
+    return new HourOfDayKey(hourOfDay, new String(sensorId));
+  }
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..ffc3129bd070e8df9711111f671660efecc16650
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java
@@ -0,0 +1,22 @@
+package theodolite.uc3.application.util;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * {@link StatsKeyFactory} for {@link HourOfDayKey}.
+ */
+public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
+
+  @Override
+  public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
+    final int hourOfDay = dateTime.getHour();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  @Override
+  public String getSensorId(final HourOfDayKey key) {
+    return key.getSensorId();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java
new file mode 100644
index 0000000000000000000000000000000000000000..f214190227b58a61246d6c319f7a5898cdd21319
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java
@@ -0,0 +1,50 @@
+package theodolite.uc3.application.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+import java.io.Serializable;
+
+/**
+ * {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new
+ * Kafka {@link Serde}.
+ */
+public class HourOfDayKeySerde extends Serializer<HourOfDayKey> implements BufferSerde<HourOfDayKey>, Serializable {
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
+    buffer.putInt(data.getHourOfDay());
+    buffer.putString(data.getSensorId());
+  }
+
+  @Override
+  public HourOfDayKey deserialize(final ReadBuffer buffer) {
+    final int hourOfDay = buffer.getInt();
+    final String sensorId = buffer.getString();
+    return new HourOfDayKey(hourOfDay, sensorId);
+  }
+
+  public static Serde<HourOfDayKey> create() {
+    return SimpleSerdes.create(new HourOfDayKeySerde());
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, HourOfDayKey object) {
+    byte[] data = object.toByteArray();
+    output.writeInt(data.length);
+    output.writeBytes(data);
+  }
+
+  @Override
+  public HourOfDayKey read(Kryo kryo, Input input, Class<HourOfDayKey> type) {
+    final int numBytes = input.readInt();
+    return HourOfDayKey.fromByteArray(input.readBytes(numBytes));
+  }
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayRecordFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayRecordFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..d8a42b74e5ca1cc55f9f21de62a5d8f877223e62
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayRecordFactory.java
@@ -0,0 +1,28 @@
+package theodolite.uc3.application.util;
+
+import com.google.common.math.Stats;
+import org.apache.kafka.streams.kstream.Windowed;
+import titan.ccp.model.records.HourOfDayActivePowerRecord;
+
+/**
+ * {@link StatsRecordFactory} to create an {@link HourOfDayActivePowerRecord}.
+ */
+public class HourOfDayRecordFactory
+    implements StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> {
+
+  @Override
+  public HourOfDayActivePowerRecord create(final Windowed<HourOfDayKey> windowed,
+      final Stats stats) {
+    return new HourOfDayActivePowerRecord(
+        windowed.key().getSensorId(),
+        windowed.key().getHourOfDay(),
+        windowed.window().start(),
+        windowed.window().end(),
+        stats.count(),
+        stats.mean(),
+        stats.populationVariance(),
+        stats.min(),
+        stats.max());
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..b7880be4eb48035959251cc56273d16407bcb888
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsFactory.java
@@ -0,0 +1,23 @@
+package theodolite.uc3.application.util;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+
+/**
+ * Factory methods for working with {@link Stats}.
+ */
+public final class StatsFactory {
+
+  private StatsFactory() {}
+
+  /**
+   * Add a value to a {@link Stats} object.
+   */
+  public static Stats accumulate(final Stats stats, final double value) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(stats);
+    statsAccumulator.add(value);
+    return statsAccumulator.snapshot();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsKeyFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsKeyFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..fdebccaa2d116253c41492cab3443057adef7b36
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsKeyFactory.java
@@ -0,0 +1,17 @@
+package theodolite.uc3.application.util;
+
+import java.time.LocalDateTime;
+
+/**
+ * Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object
+ * and vice versa.
+ *
+ * @param <T> Type of the key
+ */
+public interface StatsKeyFactory<T> {
+
+  T createKey(String sensorId, LocalDateTime dateTime);
+
+  String getSensorId(T key);
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsRecordFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsRecordFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..61333c99966b1ffea608d225f17d8460eac9ada1
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/StatsRecordFactory.java
@@ -0,0 +1,22 @@
+package theodolite.uc3.application.util;
+
+import com.google.common.math.Stats;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+/**
+ * Factory interface for creating a stats Avro record from a {@link Windowed} and a {@link Stats}.
+ * The {@link Windowed} contains about information about the start end end of the {@link Window} as
+ * well as the sensor id and the aggregated time unit. The {@link Stats} objects contains the actual
+ * aggregation results.
+ *
+ * @param <K> Key type of the {@link Windowed}
+ * @param <R> Avro record type
+ */
+@FunctionalInterface
+public interface StatsRecordFactory<K, R extends SpecificRecord> {
+
+  R create(Windowed<K> windowed, Stats stats);
+
+}
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 0000000000000000000000000000000000000000..4687602c2e2f52ba9dcb43d3b86850916e317261
--- /dev/null
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,12 @@
+application.name=theodolite-uc3-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+aggregation.duration.days=30
+aggregation.advance.days=1
+num.threads=1
+commit.interval.ms=100
+cache.max.bytes.buffering=-1
+time.zone=Europe/Paris
\ No newline at end of file