diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index dc2c7c59bb89ab8873894c92e35791cfbd925565..d56cd224a15f6c43e3893dcf5df3b8322c78dae8 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -10,7 +10,7 @@ include 'uc1-application-flink' // TODO Rename to uc1-flink
 
 include 'uc2-workload-generator' // TODO Rename to uc2-load-generator
 include 'uc2-application' // TODO Rename to uc1-kstreams
-//include 'uc2-application-flink' // TODO Rename to uc2-flink
+include 'uc2-application-flink' // TODO Rename to uc2-flink
 
 include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
 include 'uc3-application' // TODO Rename to uc1-kstreams
diff --git a/theodolite-benchmarks/uc2-application-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-application-flink/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..4e04e2891754324a6e1bf55348b6a38f592bb301
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=false
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=15
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=true
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
diff --git a/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/uc2-application-flink/Dockerfile b/theodolite-benchmarks/uc2-application-flink/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..8c2852fb2e62f9d15cdd3fadb1252eef3d2732b0
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/Dockerfile
@@ -0,0 +1,3 @@
+FROM nicobiernat/flink:1.11-scala_2.12-java_11
+
+ADD build/libs/uc2-application-all.jar /opt/flink/usrlib/artifacts/uc2-application-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc2-application-flink/build.gradle b/theodolite-benchmarks/uc2-application-flink/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..b5e847553db8f3847d5fe858c76b31520f728aff
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/build.gradle
@@ -0,0 +1,13 @@
+allprojects {
+	repositories {
+    	maven {
+    		url 'https://packages.confluent.io/maven/'
+    	}
+	}
+}
+
+dependencies {
+    compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
+}
+
+mainClassName = "theodolite.uc2.application.HistoryServiceFlinkJob"
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..b87f9b8408dc55f8e293bf252117989e7d871687
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/ConfigurationKeys.java
@@ -0,0 +1,33 @@
+package theodolite.uc2.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String APPLICATION_NAME = "application.name";
+
+  public static final String APPLICATION_VERSION = "application.version";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+  public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
+
+  public static final String FLINK_STATE_BACKEND = "flink.state.backend";
+
+  public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
+
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size";
+
+  public static final String CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {
+  }
+
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
new file mode 100644
index 0000000000000000000000000000000000000000..b26b8d60ecb56aeff03c71ce200972e6243abc4d
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -0,0 +1,150 @@
+package theodolite.uc2.application;
+
+import com.google.common.math.Stats;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
+import theodolite.commons.flink.serialization.StatsSerializer;
+import titan.ccp.common.configuration.Configurations;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+
+/**
+ * The History Microservice Flink Job.
+ */
+public class HistoryServiceFlinkJob {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
+
+  private final Configuration config = Configurations.create();
+
+  private void run() {
+    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    final String applicationId = applicationName + "-" + applicationVersion;
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
+    final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
+    final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
+    final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+    final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+
+    final Properties kafkaProps = new Properties();
+    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
+    kafkaProps.setProperty("group.id", applicationId);
+
+    final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde =
+        new FlinkMonitoringRecordSerde<>(
+            inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class);
+
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
+        inputTopic, sourceSerde, kafkaProps);
+
+    kafkaSource.setStartFromGroupOffsets();
+    if (checkpointing)
+      kafkaSource.setCommitOffsetsOnCheckpoints(true);
+    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
+
+    final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
+        new FlinkKafkaKeyValueSerde<>(outputTopic,
+            Serdes::String,
+            Serdes::String,
+            TypeInformation.of(new TypeHint<Tuple2<String, String>>(){})
+        );
+    kafkaProps.setProperty("transaction.timeout.ms", ""+5*60*1000);
+    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
+        outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
+    kafkaSink.setWriteTimestampToKafka(true);
+
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    if (checkpointing)
+      env.enableCheckpointing(commitIntervalMs);
+
+    // State Backend
+    if (stateBackend.equals("filesystem")) {
+      env.setStateBackend(new FsStateBackend(stateBackendPath));
+    } else if (stateBackend.equals("rocksdb")) {
+      try {
+        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    } else {
+      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
+    }
+
+    env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class,
+        new FlinkMonitoringRecordSerde<>(
+            inputTopic,
+            ActivePowerRecord.class,
+            ActivePowerRecordFactory.class));
+    env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+
+    env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) ->
+        LOGGER.info("Class " + c.getName() + " registered with serializer "
+            + s.getSerializer().getClass().getName()));
+
+    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic);
+
+    stream
+        .rebalance()
+        .keyBy((KeySelector<ActivePowerRecord, String>) ActivePowerRecord::getIdentifier)
+        .window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
+        .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
+        .map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() {
+          @Override
+          public Tuple2<String, String> map(Tuple2<String, Stats> t) {
+            final String key = t.f0;
+            final String value = t.f1.toString();
+            LOGGER.info(key + ": " + value);
+            return new Tuple2<>(key, value);
+          }
+        }).name("map")
+        .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
+
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info("Execution Plan: " + env.getExecutionPlan());
+    }
+
+    try {
+      env.execute(applicationId);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(final String[] args) {
+    new HistoryServiceFlinkJob().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..b5bc504677efc2fa83e44662efd0a92df239bdbd
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsAggregateFunction.java
@@ -0,0 +1,39 @@
+package theodolite.uc2.application;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import theodolite.uc2.application.util.StatsFactory;
+import titan.ccp.models.records.ActivePowerRecord;
+
+/**
+ * Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
+
+  private static final long serialVersionUID = -8873572990921515499L;
+
+  @Override
+  public Stats createAccumulator() {
+    return Stats.of();
+  }
+
+  @Override
+  public Stats add(final ActivePowerRecord value, final Stats accumulator) {
+    return StatsFactory.accumulate(accumulator, value.getValueInW());
+  }
+
+  @Override
+  public Stats getResult(final Stats accumulator) {
+    return accumulator;
+  }
+
+  @Override
+  public Stats merge(final Stats a, final Stats b) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(a);
+    statsAccumulator.addAll(b);
+    return statsAccumulator.snapshot();
+  }
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java
new file mode 100644
index 0000000000000000000000000000000000000000..fb49f46c4c87f623354900f7cbe206a7a048a784
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java
@@ -0,0 +1,18 @@
+package theodolite.uc2.application;
+
+import com.google.common.math.Stats;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+public class StatsProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> {
+
+  private static final long serialVersionUID = 4363099880614593379L;
+
+  @Override
+  public void process(String key, Context context, Iterable<Stats> elements, Collector<Tuple2<String, Stats>> out) {
+    final Stats stats = elements.iterator().next();
+    out.collect(new Tuple2<>(key, stats));
+  }
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/util/StatsFactory.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/util/StatsFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..9697108eb8dacabf925f06067199a41eb0658dbe
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/util/StatsFactory.java
@@ -0,0 +1,23 @@
+package theodolite.uc2.application.util;
+
+import com.google.common.math.Stats;
+import com.google.common.math.StatsAccumulator;
+
+/**
+ * Factory methods for working with {@link Stats}.
+ */
+public final class StatsFactory {
+
+  private StatsFactory() {}
+
+  /**
+   * Add a value to a {@link Stats} object.
+   */
+  public static Stats accumulate(final Stats stats, final double value) {
+    final StatsAccumulator statsAccumulator = new StatsAccumulator();
+    statsAccumulator.addAll(stats);
+    statsAccumulator.add(value);
+    return statsAccumulator.snapshot();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 0000000000000000000000000000000000000000..171eec7cfc7741956a27d0e1de112de01ab1f1ed
--- /dev/null
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,10 @@
+application.name=theodolite-uc2-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+num.threads=1
+commit.interval.ms=100
+cache.max.bytes.buffering=-1
+kafka.window.duration.minutes=1
\ No newline at end of file