diff --git a/theodolite-benchmarks/application-kafkastreams-commons/build.gradle b/theodolite-benchmarks/application-kafkastreams-commons/build.gradle
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..c1ce7502eddd48c7fb50f754012334e01823a3c6 100644
--- a/theodolite-benchmarks/application-kafkastreams-commons/build.gradle
+++ b/theodolite-benchmarks/application-kafkastreams-commons/build.gradle
@@ -0,0 +1,10 @@
+dependencies {
+  // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
+  // implementation 'org.slf4j:slf4j-simple:1.7.25'
+  implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
+  implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
+  implementation 'org.apache.kafka:kafka-streams:2.6.0'
+
+  // Use JUnit test framework
+  testImplementation 'junit:junit:4.12'
+  }
\ No newline at end of file
diff --git a/theodolite-benchmarks/build.gradle b/theodolite-benchmarks/build.gradle
index ea8fb80bb2c2bac6121dbaaf72f742aa0e9c62bb..9dfe6638d0a954901a5c0f5126f6198961fc32ac 100644
--- a/theodolite-benchmarks/build.gradle
+++ b/theodolite-benchmarks/build.gradle
@@ -7,14 +7,17 @@ buildscript {
   }
   dependencies {
     classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.0"
+    classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
   }
 }
 
 // Variables used to distinct different subprojects
 def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')}
 def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')}
+def useCaseApplicationsFlink = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application-flink')}
 def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')}
 def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
+def flinkCommon = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
 
 // Plugins
 allprojects {
@@ -32,6 +35,11 @@ configure(useCaseProjects){
     apply plugin: 'application'
 }
 
+configure(useCaseApplicationsFlink){
+    apply plugin: 'com.github.johnrengelman.shadow'
+    applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
+}
+
 // Java version for all subprojects
 subprojects {
   java {
@@ -52,13 +60,16 @@ allprojects {
 	    maven {
 	    	url "https://oss.sonatype.org/content/repositories/snapshots/"
 	    }
-      maven {
-        url 'https://packages.confluent.io/maven/'
+	    maven {
+	        url 'https://packages.confluent.io/maven/'
+	    }
+    	maven {
+        	url 'https://repository.apache.org/content/repositories/snapshots/'
+    	} // TODO required?
     }
-	}
 }
 
-// Dependencies for all use case applications
+// Dependencies for all Kafka Streams benchmarks (use case applications)
 configure(useCaseApplications) {
   dependencies {
       // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
@@ -75,7 +86,60 @@ configure(useCaseApplications) {
   }
 }
 
-// Dependencies for all use case generators
+// Dependencies for all Flink benchmarks (use case applications)
+configure(useCaseApplicationsFlink) {
+  ext {
+    flinkVersion = '1.11.0'
+    scalaBinaryVersion = '2.12'
+  }
+
+  dependencies {
+       // These dependencies is exported to consumers, that is to say found on their compile classpath.
+      compile('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') {
+          changing = true
+      }
+      api 'net.kieker-monitoring:kieker:1.14'//-SNAPSHOT'
+      api 'net.sourceforge.teetime:teetime:3.0'
+      // TODO Upgrade to 0.1.0
+
+      // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
+      implementation 'org.apache.kafka:kafka-clients:2.2.0'
+      implementation 'com.google.guava:guava:24.1-jre'
+      implementation 'org.jctools:jctools-core:2.1.1'
+      implementation 'org.slf4j:slf4j-simple:1.6.1'
+      compile project(':flink-commons')
+
+      compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0"
+      compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
+      compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
+      compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}"
+      compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}"
+      compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
+      compile group: 'org.industrial-devops', name: 'titan-ccp-common', version: '0.0.3-SNAPSHOT'
+      compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development
+      compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
+      compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_2.12', version: '1.11.1'
+
+      // Use JUnit test framework
+      testImplementation 'junit:junit:4.12'
+  }
+  
+  run.classpath = sourceSets.main.runtimeClasspath
+
+  jar {
+      manifest {
+          attributes 'Built-By': System.getProperty('user.name'),
+                     'Build-Jdk': System.getProperty('java.version')
+      }
+  }
+
+  shadowJar {
+      configurations = [project.configurations.compile]
+      zip64 true
+  }
+}
+
+// Dependencies for all load generators
 configure(useCaseGenerators) {
   dependencies {
       // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
@@ -92,7 +156,7 @@ configure(useCaseGenerators) {
 }
 
 // Dependencies for all commons
-configure(commonProjects) {
+/*configure(commonProjects) {
   dependencies {
       // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
       implementation 'org.slf4j:slf4j-simple:1.7.25'
@@ -103,7 +167,7 @@ configure(commonProjects) {
       // Use JUnit test framework
       testImplementation 'junit:junit:4.12'
   }
-}
+}*/
 
 // Per default XML reports for SpotBugs are generated
 // Include this to generate HTML reports
diff --git a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..98b5ca8064a352aacfe2aebd13fbd0a87735fc3e
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=false
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=15
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=false
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
diff --git a/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/flink-commons/build.gradle b/theodolite-benchmarks/flink-commons/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..79956c43997e82a88d243de4a55d2a1dcf4aed77
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/build.gradle
@@ -0,0 +1,19 @@
+ext {
+    flinkVersion = '1.11.0'
+    scalaBinaryVersion = '2.12'
+}
+
+dependencies {
+    api 'org.apache.kafka:kafka-clients:2.2.0'
+    // implementation 'org.slf4j:slf4j-simple:1.6.1'
+    implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') {
+        changing = true
+        // exclude group: 'net.kieker-monitoring', module: 'kieker'
+    }
+    implementation 'com.google.guava:guava:30.1-jre'
+    compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
+    compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
+    
+    // Use JUnit test framework
+    testImplementation 'junit:junit:4.12'
+  }
\ No newline at end of file
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java
new file mode 100644
index 0000000000000000000000000000000000000000..a09cbd210f242ea63f6281172f4a21e2d22357fe
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java
@@ -0,0 +1,71 @@
+package theodolite.commons.flink.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serde;
+
+import javax.annotation.Nullable;
+
+public class FlinkKafkaKeyValueSerde<K, V>
+    implements KafkaDeserializationSchema<Tuple2<K, V>>,
+               KafkaSerializationSchema<Tuple2<K, V>> {
+
+  private static final long serialVersionUID = 2469569396501933443L;
+
+  private transient Serde<K> keySerde;
+  private transient Serde<V> valueSerde;
+
+  private SerializableSupplier<Serde<K>> keySerdeSupplier;
+  private SerializableSupplier<Serde<V>> valueSerdeSupplier;
+
+  private String topic;
+
+  private TypeInformation<Tuple2<K,V>> typeInfo;
+
+  public FlinkKafkaKeyValueSerde(final String topic,
+                                 final SerializableSupplier<Serde<K>> keySerdeSupplier,
+                                 final SerializableSupplier<Serde<V>> valueSerdeSupplier,
+                                 final TypeInformation<Tuple2<K, V>> typeInfo) {
+    this.topic = topic;
+    this.typeInfo = typeInfo;
+    this.keySerdeSupplier = keySerdeSupplier;
+    this.valueSerdeSupplier = valueSerdeSupplier;
+  }
+
+  @Override
+  public boolean isEndOfStream(final Tuple2<K, V> nextElement) {
+    return false;
+  }
+
+  @Override
+  public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
+    ensureInitialized();
+    final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
+    final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
+    return new Tuple2<>(key, value);
+  }
+
+  @Override
+  public TypeInformation<Tuple2<K, V>> getProducedType() {
+    return this.typeInfo;
+  }
+
+  @Override
+  public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) {
+    ensureInitialized();
+    final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
+    final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
+    return new ProducerRecord<>(this.topic, key, value);
+  }
+
+  private void ensureInitialized() {
+    if (this.keySerde == null || this.valueSerde == null) {
+      this.keySerde = this.keySerdeSupplier.get();
+      this.valueSerde = this.valueSerdeSupplier.get();;
+    }
+  }
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java
new file mode 100644
index 0000000000000000000000000000000000000000..8bca461e4d6b75aa8340bbba3796c53ceacbb46d
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkMonitoringRecordSerde.java
@@ -0,0 +1,126 @@
+package theodolite.commons.flink.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import kieker.common.record.IMonitoringRecord;
+import kieker.common.record.factory.IRecordFactory;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.AggregatedActivePowerRecord;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * This class wraps the serializer and deserializer implementations for {@link IMonitoringRecord}
+ * from {@link IMonitoringRecordSerde}
+ * into the Flink {@link DeserializationSchema} and {@link SerializationSchema}
+ * and Kryo {@link Serializer} interfaces.
+ * It is used for serialization to and from Kafka as well as internal serialization
+ * between Flink instances.
+ * This class is also itself serializable by Flink.
+ * @param <R> The specific record type that extends {@link IMonitoringRecord}
+ * @param <F> The specific record factory type that extends {@link IRecordFactory<R>}
+ */
+public class FlinkMonitoringRecordSerde<R extends IMonitoringRecord, F extends IRecordFactory<R>>
+    extends Serializer<R>
+    implements KafkaDeserializationSchema<R>,
+               KafkaSerializationSchema<R> {
+
+  private static final long serialVersionUID = -5687951056995646212L;
+
+  private final String topic;
+  private transient Serde<String> keySerde;
+  private transient Serde<R> serde;
+
+  private final Class<R> recordClass;
+  private final Class<F> recordFactoryClass;
+
+  /**
+   * Creates a new FlinkMonitoringRecordSerde.
+   * @param topic
+   *  The Kafka topic to/from which to serialize/deserialize.
+   * @param recordClass
+   *  The class of the serialized/deserialized record.
+   * @param recordFactoryClass
+   *  The class of the factory for the serialized/deserialized record.
+   */
+  public FlinkMonitoringRecordSerde(final String topic,
+                                    final Class<R> recordClass,
+                                    final Class<F> recordFactoryClass) {
+    this.topic = topic;
+    this.recordClass = recordClass;
+    this.recordFactoryClass = recordFactoryClass;
+  }
+
+  @Override
+  public boolean isEndOfStream(final R nextElement) {
+    return false;
+  }
+
+  @Override
+  public TypeInformation<R> getProducedType() {
+    return TypeExtractor.getForClass(recordClass);
+  }
+
+  @Override
+  public R deserialize(ConsumerRecord<byte[], byte[]> record) {
+    ensureInitialized();
+    return this.serde.deserializer().deserialize(this.topic, record.value());
+  }
+
+  @Override
+  public ProducerRecord<byte[], byte[]> serialize(R element, @Nullable Long timestamp) {
+    ensureInitialized();
+    String identifier = null;
+    if (element instanceof ActivePowerRecord) {
+      identifier = ((ActivePowerRecord) element).getIdentifier();
+    }
+    if (element instanceof AggregatedActivePowerRecord) {
+      identifier = ((AggregatedActivePowerRecord) element).getIdentifier();
+    }
+    final byte[] key = this.keySerde.serializer().serialize(this.topic, identifier);
+    final byte[] value = this.serde.serializer().serialize(this.topic, element);
+    return new ProducerRecord<>(this.topic, key, value);
+  }
+
+  private void ensureInitialized() {
+    if (this.keySerde == null || this.serde == null) {
+      try {
+        this.keySerde = Serdes.String();
+        this.serde = IMonitoringRecordSerde.serde(
+            recordFactoryClass.getDeclaredConstructor().newInstance());
+      } catch (NoSuchMethodException | InstantiationException | IllegalAccessException
+          | InvocationTargetException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public void write(final Kryo kryo, final Output output, final R record) {
+    ensureInitialized();
+    final byte[] data = this.serde.serializer().serialize(this.topic, record);
+    output.writeInt(data.length);
+    output.writeBytes(data);
+  }
+
+  @Override
+  public R read(final Kryo kryo, final Input input, final Class<R> type) {
+    ensureInitialized();
+    final int numBytes = input.readInt();
+    return this.serde.deserializer().deserialize(this.topic, input.readBytes(numBytes));
+  }
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java
new file mode 100644
index 0000000000000000000000000000000000000000..1f535c74697507f06c97d97b1b86c1086ec1491d
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java
@@ -0,0 +1,8 @@
+package theodolite.commons.flink.serialization;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+  // here be dragons
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..53d25dced2aa3b1736ace1c38c6a75bf0f34e24a
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
@@ -0,0 +1,30 @@
+package theodolite.commons.flink.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.math.Stats;
+
+import java.io.Serializable;
+
+/**
+ * Custom Kryo Serializer for efficient transmission between Flink instances.
+ */
+public class StatsSerializer extends Serializer<Stats> implements Serializable {
+
+  private static final long serialVersionUID = -1276866176534267373L;
+
+  @Override
+  public void write(final Kryo kryo, final Output output, final Stats object) {
+    final byte[] data = object.toByteArray();
+    output.writeInt(data.length);
+    output.writeBytes(data);
+  }
+
+  @Override
+  public Stats read(final Kryo kryo, final Input input, final Class<Stats> type) {
+    final int numBytes = input.readInt();
+    return Stats.fromByteArray(input.readBytes(numBytes));
+  }
+}
diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle
index 5c524a57cedbfdaff4aa8e3e39ed3a07711948bc..dc2c7c59bb89ab8873894c92e35791cfbd925565 100644
--- a/theodolite-benchmarks/settings.gradle
+++ b/theodolite-benchmarks/settings.gradle
@@ -1,16 +1,21 @@
 rootProject.name = 'theodolite-benchmarks'
 
-include 'workload-generator-commons'
-include 'application-kafkastreams-commons'
+include 'workload-generator-commons' // TODO Rename to load-generator-commons
+include 'application-kafkastreams-commons' // TODO Rename to kstreams-commons
+include 'flink-commons'
 
-include 'uc1-workload-generator'
-include 'uc1-application'
+include 'uc1-workload-generator' // TODO Rename to uc1-load-generator
+include 'uc1-application' // TODO Rename to uc1-kstreams
+include 'uc1-application-flink' // TODO Rename to uc1-flink
 
-include 'uc2-workload-generator'
-include 'uc2-application'
+include 'uc2-workload-generator' // TODO Rename to uc2-load-generator
+include 'uc2-application' // TODO Rename to uc1-kstreams
+//include 'uc2-application-flink' // TODO Rename to uc2-flink
 
-include 'uc3-workload-generator'
-include 'uc3-application'
+include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
+include 'uc3-application' // TODO Rename to uc1-kstreams
+//include 'uc3-application-flink' // TODO Rename to uc3-flink
 
-include 'uc4-workload-generator'
-include 'uc4-application'
+include 'uc4-workload-generator' // TODO Rename to uc4-load-generator
+include 'uc4-application' // TODO Rename to uc4-kstreams
+//include 'uc4-application-flink' // TODO Rename to uc4-flink
diff --git a/theodolite-benchmarks/uc1-application-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-application-flink/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..fa98ca63d77bdee891150bd6713f70197a75cefc
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=true
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=15
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=true
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
diff --git a/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs b/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.checkstyle.prefs
@@ -0,0 +1,4 @@
+configFilePath=../config/checkstyle.xml
+customModulesJarPaths=
+eclipse.preferences.version=1
+enabled=true
diff --git a/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.pmd.prefs b/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/.settings/qa.eclipse.plugin.pmd.prefs
@@ -0,0 +1,4 @@
+customRulesJars=
+eclipse.preferences.version=1
+enabled=true
+ruleSetFilePath=../config/pmd.xml
diff --git a/theodolite-benchmarks/uc1-application-flink/Dockerfile b/theodolite-benchmarks/uc1-application-flink/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..cde0dc7dd9c864d4ad301dadb8d664dad1440aac
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/Dockerfile
@@ -0,0 +1,3 @@
+FROM nicobiernat/flink:1.11-scala_2.12-java_11
+
+ADD build/libs/uc1-application-all.jar /opt/flink/usrlib/artifacts/uc1-application-all.jar
diff --git a/theodolite-benchmarks/uc1-application-flink/build.gradle b/theodolite-benchmarks/uc1-application-flink/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..8b8552dbc0c116a0987dbdfe874ca3111c8f11b9
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/build.gradle
@@ -0,0 +1 @@
+mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..3d1de40852a1c3907ac041fbbd1c17a870622405
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/ConfigurationKeys.java
@@ -0,0 +1,22 @@
+package theodolite.uc1.application;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String APPLICATION_NAME = "application.name";
+
+  public static final String APPLICATION_VERSION = "application.version";
+
+  public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
+
+  public static final String CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
new file mode 100644
index 0000000000000000000000000000000000000000..931436543bc66e0a295f31dbec6261d209a16f46
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -0,0 +1,69 @@
+package theodolite.uc1.application;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
+import titan.ccp.common.configuration.Configurations;
+import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.models.records.ActivePowerRecordFactory;
+
+import java.util.Properties;
+
+/**
+ * The History Microservice Flink Job.
+ */
+public class HistoryServiceFlinkJob {
+
+  private final Configuration config = Configurations.create();
+
+  private void run() {
+    final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
+    final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
+    final String applicationId = applicationName + "-" + applicationVersion;
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+
+    final Properties kafkaProps = new Properties();
+    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
+    kafkaProps.setProperty("group.id", applicationId);
+
+    final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> serde =
+        new FlinkMonitoringRecordSerde<>(inputTopic,
+                                         ActivePowerRecord.class,
+                                         ActivePowerRecordFactory.class);
+
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
+        new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
+    kafkaConsumer.setStartFromGroupOffsets();
+    if (checkpointing)
+      kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    if (checkpointing)
+      env.enableCheckpointing(commitIntervalMs);
+
+    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
+
+    stream
+        .rebalance()
+        .map(v -> "ActivePowerRecord { "
+            + "identifier: " + v.getIdentifier() + ", "
+            + "timestamp: " + v.getTimestamp() + ", "
+            + "valueInW: " + v.getValueInW() + " }")
+        .print();
+
+    try {
+      env.execute(applicationId);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(final String[] args) {
+    new HistoryServiceFlinkJob().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties
new file mode 100644
index 0000000000000000000000000000000000000000..eee4ea94a16b030a7dedd3daf37757d277fc4003
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/src/main/resources/META-INF/application.properties
@@ -0,0 +1,10 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
diff --git a/theodolite-benchmarks/workload-generator-commons/build.gradle b/theodolite-benchmarks/workload-generator-commons/build.gradle
index 98d820b480ba0b357b74f82ebce5a647ee392461..c42fff0412c332bc8292e175a352c03ada71f659 100644
--- a/theodolite-benchmarks/workload-generator-commons/build.gradle
+++ b/theodolite-benchmarks/workload-generator-commons/build.gradle
@@ -2,4 +2,11 @@ dependencies {
   implementation 'com.google.guava:guava:30.1-jre'
   implementation 'com.hazelcast:hazelcast:4.1.1'
   implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1'
+  implementation 'org.slf4j:slf4j-simple:1.7.25'
+  implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
+  implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
+  implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required?
+  
+  // Use JUnit test framework
+  testImplementation 'junit:junit:4.12'
 }
\ No newline at end of file