diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index aaef44bad89714bc269af83ba50f00e3d63217a9..7cf1347be8adcbc862e960063fa48735a3531ca7 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -38,6 +38,7 @@ build-benchmarks:
   artifacts:
     paths:
       - "theodolite-benchmarks/build/libs/*.jar"
+      - "theodolite-benchmarks/*/build/libs/*.jar"
       - "theodolite-benchmarks/*/build/distributions/*.tar"
     expire_in: 1 day
 
@@ -124,30 +125,54 @@ spotbugs-benchmarks:
       when: manual
       allow_failure: true
 
-deploy-uc1-kstreams-app:
+deploy-uc1-kstreams:
   extends: .deploy-benchmarks
   variables:
     IMAGE_NAME: "theodolite-uc1-kstreams-app"
     JAVA_PROJECT_NAME: "uc1-application"
 
-deploy-uc2-kstreams-app:
+deploy-uc2-kstreams:
   extends: .deploy-benchmarks
   variables:
     IMAGE_NAME: "theodolite-uc2-kstreams-app"
     JAVA_PROJECT_NAME: "uc2-application"
 
-deploy-uc3-kstreams-app:
+deploy-uc3-kstreams:
   extends: .deploy-benchmarks
   variables:
     IMAGE_NAME: "theodolite-uc3-kstreams-app"
     JAVA_PROJECT_NAME: "uc3-application"
 
-deploy-uc4-kstreams-app:
+deploy-uc4-kstreams:
   extends: .deploy-benchmarks
   variables:
     IMAGE_NAME: "theodolite-uc4-kstreams-app"
     JAVA_PROJECT_NAME: "uc4-application"
 
+deploy-uc1-flink:
+  extends: .deploy-benchmarks
+  variables:
+    IMAGE_NAME: "theodolite-uc1-flink"
+    JAVA_PROJECT_NAME: "uc1-application-flink"
+
+deploy-uc2-flink:
+  extends: .deploy-benchmarks
+  variables:
+    IMAGE_NAME: "theodolite-uc2-flink"
+    JAVA_PROJECT_NAME: "uc2-application-flink"
+
+deploy-uc3-flink:
+  extends: .deploy-benchmarks
+  variables:
+    IMAGE_NAME: "theodolite-uc3-flink"
+    JAVA_PROJECT_NAME: "uc3-application-flink"
+
+deploy-uc4-flink:
+  extends: .deploy-benchmarks
+  variables:
+    IMAGE_NAME: "theodolite-uc4-flink"
+    JAVA_PROJECT_NAME: "uc4-application-flink"
+
 deploy-uc1-load-generator:
   extends: .deploy-benchmarks
   variables:
diff --git a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml
new file mode 100755
index 0000000000000000000000000000000000000000..aa35ac2d1dee01cdf25d2eb2ac77bd056865479a
--- /dev/null
+++ b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml
@@ -0,0 +1,69 @@
+version: '2'
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper
+    expose:
+      - "9092"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+  kafka:
+    image: wurstmeister/kafka
+    expose:
+      - "9092"
+    #ports:
+    #  - 19092:19092
+    environment:
+      KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
+      KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.3.1
+    depends_on:
+      - zookeeper
+      - kafka
+    expose:
+      - "8081"
+    #ports:
+    #  - 8081:8081
+    environment:
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest
+    depends_on:
+      - schema-registry
+      - kafka
+    environment:
+      BOOTSTRAP_SERVER: uc-wg:5701
+      PORT: 5701
+      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+      SCHEMA_REGISTRY_URL: http://schema-registry:8081
+      NUM_SENSORS: 10
+  benchmark-jobmanager:
+    image: ghcr.io/cau-se/theodolite-uc1-flink:latest
+    ports:
+      - "8080:8081"
+    command: standalone-job --job-classname theodolite.uc1.application.HistoryServiceFlinkJob
+    environment:
+      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
+      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+        parallelism.default: 1
+    depends_on:
+      - schema-registry
+      - kafka
+  benchmark-taskmanager:
+    image: ghcr.io/cau-se/theodolite-uc1-flink:latest
+    command: taskmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+    depends_on:
+      - schema-registry
+      - kafka    
diff --git a/theodolite-benchmarks/docker-test/uc1-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
similarity index 89%
rename from theodolite-benchmarks/docker-test/uc1-docker-compose/docker-compose.yml
rename to theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
index cdc9df40257362934a93fcbe2de24b6035d40bca..403becacff5a386eddfaa8e59fe7873d2adb006c 100755
--- a/theodolite-benchmarks/docker-test/uc1-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
@@ -31,16 +31,16 @@ services:
     environment:
       SCHEMA_REGISTRY_HOST_NAME: schema-registry
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-  uc-app:
-    image: theodolite/theodolite-uc1-kstreams-app:latest
+  benchmark:
+    image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest
     depends_on:
       - schema-registry
       - kafka
     environment:
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
-  uc-wg: 
-    image: theodolite/theodolite-uc1-workload-generator:latest
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest
     depends_on:
       - schema-registry
       - kafka
diff --git a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml
new file mode 100755
index 0000000000000000000000000000000000000000..a8bf56d52c1be7fea3f172d86f6deac27fcc24f7
--- /dev/null
+++ b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml
@@ -0,0 +1,70 @@
+version: '2'
+services:
+  zookeeper:
+    #image: wurstmeister/zookeeper
+    image: confluentinc/cp-zookeeper
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+  kafka:
+    image: wurstmeister/kafka
+    expose:
+      - "9092"
+    #ports:
+    #  - 19092:19092
+    environment:
+      KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
+      KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.3.1
+    depends_on:
+      - zookeeper
+      - kafka
+    #ports:
+    #  - "8081:8081"
+    expose:
+      - "8081"
+    environment:
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest
+    depends_on:
+      - schema-registry
+      - kafka
+    environment:
+      BOOTSTRAP_SERVER: uc-wg:5701
+      PORT: 5701
+      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+      SCHEMA_REGISTRY_URL: http://schema-registry:8081
+      NUM_SENSORS: 10  
+  benchmark-jobmanager:
+    image: ghcr.io/cau-se/theodolite-uc2-flink:latest
+    ports:
+      - "8080:8081"
+    command: standalone-job --job-classname theodolite.uc2.application.HistoryServiceFlinkJob
+    environment:
+      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
+      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+        parallelism.default: 1
+    depends_on:
+      - schema-registry
+      - kafka
+  benchmark-taskmanager:
+    image: ghcr.io/cau-se/theodolite-uc2-flink:latest
+    command: taskmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+    depends_on:
+      - schema-registry
+      - kafka
diff --git a/theodolite-benchmarks/docker-test/uc2-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
similarity index 90%
rename from theodolite-benchmarks/docker-test/uc2-docker-compose/docker-compose.yml
rename to theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
index 613553fcfa53122205b6e58d85fb7225eae90d7c..20d2c62dac13af29ec50439670308f2911f0d57a 100755
--- a/theodolite-benchmarks/docker-test/uc2-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
@@ -32,8 +32,8 @@ services:
     environment:
       SCHEMA_REGISTRY_HOST_NAME: schema-registry
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-  uc-app:
-    image: theodolite/theodolite-uc2-kstreams-app:latest
+  benchmark:
+    image: ghcr.io/cau-se/theodolite-uc2-kstreams-app:latest
     depends_on:
       - schema-registry
       - kafka
@@ -41,8 +41,8 @@ services:
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
       KAFKA_WINDOW_DURATION_MINUTES: 60
-  uc-wg: 
-    image: theodolite/theodolite-uc2-workload-generator:latest
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest
     depends_on:
       - schema-registry
       - kafka
diff --git a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml
new file mode 100755
index 0000000000000000000000000000000000000000..9999caf046e844d066200ecfbf15d3351c167d31
--- /dev/null
+++ b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml
@@ -0,0 +1,70 @@
+version: '2'
+services:
+  zookeeper:
+    #image: wurstmeister/zookeeper
+    image: confluentinc/cp-zookeeper
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+  kafka:
+    image: wurstmeister/kafka
+    expose:
+      - "9092"
+    #ports:
+    #  - 19092:19092
+    environment:
+      KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
+      KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.3.1
+    depends_on:
+      - zookeeper
+      - kafka
+    #ports:
+    #  - "8081:8081"
+    expose:
+      - "8081"
+    environment:
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
+    depends_on:
+      - schema-registry
+      - kafka
+    environment:
+      BOOTSTRAP_SERVER: uc-wg:5701
+      PORT: 5701
+      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+      SCHEMA_REGISTRY_URL: http://schema-registry:8081
+      NUM_SENSORS: 10
+  benchmark-jobmanager:
+    image: ghcr.io/cau-se/theodolite-uc3-flink:latest
+    ports:
+      - "8080:8081"
+    command: standalone-job --job-classname theodolite.uc3.application.HistoryServiceFlinkJob
+    environment:
+      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
+      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+        parallelism.default: 1
+    depends_on:
+      - schema-registry
+      - kafka
+  benchmark-taskmanager:
+    image: ghcr.io/cau-se/theodolite-uc3-flink:latest
+    command: taskmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+    depends_on:
+      - schema-registry
+      - kafka
diff --git a/theodolite-benchmarks/docker-test/uc3-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
similarity index 90%
rename from theodolite-benchmarks/docker-test/uc3-docker-compose/docker-compose.yml
rename to theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
index d321318b4024b678cf8f37007e90dc62a2042ece..ef16b858536b0d133dc49d002d16cf6c04193297 100755
--- a/theodolite-benchmarks/docker-test/uc3-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
@@ -32,16 +32,16 @@ services:
     environment:
       SCHEMA_REGISTRY_HOST_NAME: schema-registry
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-  uc-app:
-    image: theodolite/theodolite-uc3-kstreams-app:latest
+      benchmark:
+    image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
     depends_on:
       - schema-registry
       - kafka
     environment:
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
-  uc-wg: 
-    image: theodolite/theodolite-uc3-workload-generator:latest
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
     depends_on:
       - schema-registry
       - kafka
diff --git a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml
new file mode 100755
index 0000000000000000000000000000000000000000..80720063991100bae2c8c148f14cd6f1a32bb0ff
--- /dev/null
+++ b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml
@@ -0,0 +1,70 @@
+version: '2'
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper
+    expose:
+      - "2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+  kafka:
+    image: wurstmeister/kafka
+    expose:
+      - "9092"
+    #ports:
+    #  - 19092:19092
+    environment:
+      KAFKA_LISTENERS: PLAINTEXT://:9092,CONNECTIONS_FROM_HOST://:19092
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
+      KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.3.1
+    depends_on:
+      - zookeeper
+      - kafka
+    expose:
+      - "8081"
+    #ports:
+    #  - 8081:8081
+    environment:
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
+    depends_on:
+      - schema-registry
+      - kafka
+    environment:
+      BOOTSTRAP_SERVER: uc-wg:5701
+      PORT: 5701
+      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+      SCHEMA_REGISTRY_URL: http://schema-registry:8081
+      NUM_SENSORS: 4
+      NUM_NESTED_GROUPS: 4
+  benchmark-jobmanager:
+    image: ghcr.io/cau-se/theodolite-uc4-flink:latest
+    ports:
+      - "8080:8081"
+    command: standalone-job --job-classname theodolite.uc4.application.AggregationServiceFlinkJob
+    environment:
+      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
+      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+        parallelism.default: 1
+    depends_on:
+      - schema-registry
+      - kafka
+  benchmark-taskmanager:
+    image: ghcr.io/cau-se/theodolite-uc4-flink:latest
+    command: taskmanager
+    environment:
+      - |
+        FLINK_PROPERTIES=
+        jobmanager.rpc.address: benchmark-jobmanager
+    depends_on:
+      - schema-registry
+      - kafka
diff --git a/theodolite-benchmarks/docker-test/uc4-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
similarity index 90%
rename from theodolite-benchmarks/docker-test/uc4-docker-compose/docker-compose.yml
rename to theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
index d478d74e55a1b5423a390c624848b20f5faf2969..5e4cb94469f2f6cc8c48694a7ea6c885f066622d 100755
--- a/theodolite-benchmarks/docker-test/uc4-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
@@ -31,16 +31,16 @@ services:
     environment:
       SCHEMA_REGISTRY_HOST_NAME: schema-registry
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-  uc-app:
-    image: theodolite/theodolite-uc4-kstreams-app:latest
+  benchmark:
+    image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest
     depends_on:
       - schema-registry
       - kafka
     environment:
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
-  uc-wg: 
-    image: theodolite/theodolite-uc4-workload-generator:latest
+  load-generator: 
+    image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
     depends_on:
       - schema-registry
       - kafka
diff --git a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..66b402b58f39b79066638ce679c27c0378d5be54 100644
--- a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
 cleanup.qualify_static_method_accesses_with_declaring_class=false
 cleanup.remove_private_constructors=true
 cleanup.remove_redundant_modifiers=false
-cleanup.remove_redundant_semicolons=false
+cleanup.remove_redundant_semicolons=true
 cleanup.remove_redundant_type_arguments=true
 cleanup.remove_trailing_whitespaces=true
 cleanup.remove_trailing_whitespaces_all=true
@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
 org.eclipse.jdt.ui.staticondemandthreshold=99
+org.eclipse.jdt.ui.text.custom_code_templates=
 sp_cleanup.add_default_serial_version_id=true
 sp_cleanup.add_generated_serial_version_id=false
 sp_cleanup.add_missing_annotations=true
diff --git a/theodolite-benchmarks/flink-commons/build.gradle b/theodolite-benchmarks/flink-commons/build.gradle
index cff49b845a95b2d6f49fd2ef16b51124d5507d29..2ced4ff25af3270a76412e2c8dcc61e2713e16e1 100644
--- a/theodolite-benchmarks/flink-commons/build.gradle
+++ b/theodolite-benchmarks/flink-commons/build.gradle
@@ -20,8 +20,13 @@ dependencies {
     implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
     implementation 'com.google.guava:guava:30.1-jre'
     compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
+    compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
+    compile group: 'org.apache.flink', name: "flink-runtime_${scalaBinaryVersion}", version: "${flinkVersion}"
     compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
-
+    compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
+    implementation "org.apache.flink:flink-avro:${flinkVersion}"
+    implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
+    
     // Use JUnit test framework
     testImplementation 'junit:junit:4.12'
   }
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..2847ede440ecd65bdf35fc8e825d0f7b723a3f8f
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/ConfigurationKeys.java
@@ -0,0 +1,19 @@
+package theodolite.commons.flink;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String FLINK_STATE_BACKEND = "flink.state.backend";
+
+  public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
+
+  public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
+      "flink.state.backend.memory.size";
+
+  public static final String FLINK_CHECKPOINTING = "checkpointing";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..55d73b0fb9274b0ae67468d50b7978799d7e6257
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/KafkaConnectorFactory.java
@@ -0,0 +1,154 @@
+package theodolite.commons.flink;
+
+import java.time.Duration;
+import java.util.Properties;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.util.SerializableSupplier;
+
+/**
+ * A class for creating {@link FlinkKafkaConsumer} and {@link FlinkKafkaProducer}.
+ */
+public class KafkaConnectorFactory {
+
+  private static final Duration PRODUCER_TRANSACTION_TIMEOUT = Duration.ofMinutes(5);
+
+  private final Properties kafkaProps = new Properties();
+  private final boolean checkpointingEnabled;
+  private final String schemaRegistryUrl;
+
+  /**
+   * Create a new {@link KafkaConnectorFactory} from the provided parameters.
+   */
+  public KafkaConnectorFactory(
+      final String appName,
+      final String bootstrapServers,
+      final boolean checkpointingEnabled,
+      final String schemaRegistryUrl) {
+    this.checkpointingEnabled = checkpointingEnabled;
+    this.schemaRegistryUrl = schemaRegistryUrl;
+    this.kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    this.kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, appName);
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes data using a
+   * {@link DeserializationSchema}.
+   */
+  public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final DeserializationSchema<T> deserializationSchema) {
+    return this.createBaseConsumer(
+        new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes data using a
+   * {@link KafkaDeserializationSchema}.
+   */
+  public <T> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final KafkaDeserializationSchema<T> deserializationSchema) {
+    return this.createBaseConsumer(
+        new FlinkKafkaConsumer<>(topic, deserializationSchema, this.cloneProperties()));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes {@link Tuple2}s using two Kafka
+   * {@link Serde}s.
+   */
+  public <K, V> FlinkKafkaConsumer<Tuple2<K, V>> createConsumer(
+      final String topic,
+      final SerializableSupplier<Serde<K>> kafkaKeySerde,
+      final SerializableSupplier<Serde<V>> kafkaValueSerde,
+      final TypeInformation<Tuple2<K, V>> typeInformation) {
+    return this.<Tuple2<K, V>>createConsumer(
+        topic,
+        new FlinkKafkaKeyValueSerde<>(
+            topic,
+            kafkaKeySerde,
+            kafkaValueSerde,
+            typeInformation));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaConsumer} that consumes from a topic associated with Confluent
+   * Schema Registry.
+   */
+  public <T extends SpecificRecord> FlinkKafkaConsumer<T> createConsumer(final String topic,
+      final Class<T> typeClass) {
+    // Maybe move to subclass for Confluent-Schema-Registry-specific things
+    final DeserializationSchema<T> deserializationSchema =
+        ConfluentRegistryAvroDeserializationSchema.forSpecific(typeClass, this.schemaRegistryUrl);
+    return this.createConsumer(topic, deserializationSchema);
+  }
+
+  private <T> FlinkKafkaConsumer<T> createBaseConsumer(final FlinkKafkaConsumer<T> baseConsumer) {
+    baseConsumer.setStartFromGroupOffsets();
+    if (this.checkpointingEnabled) {
+      baseConsumer.setCommitOffsetsOnCheckpoints(true); // TODO Validate if this is sensible
+    }
+    baseConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
+    return baseConsumer;
+  }
+
+
+  /**
+   * Create a new {@link FlinkKafkaProducer} that produces data using a
+   * {@link KafkaSerializationSchema}.
+   */
+  public <T> FlinkKafkaProducer<T> createProducer(final String topic,
+      final KafkaSerializationSchema<T> serializationSchema) {
+    final Properties producerProps = this.buildProducerProperties();
+    return this.createBaseProducer(new FlinkKafkaProducer<>(
+        topic, serializationSchema, producerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
+  }
+
+  /**
+   * Create a new {@link FlinkKafkaProducer} that produces {@link Tuple2}s using two Kafka
+   * {@link Serde}s.
+   */
+  public <K, V> FlinkKafkaProducer<Tuple2<K, V>> createProducer(
+      final String topic,
+      final SerializableSupplier<Serde<K>> kafkaKeySerde,
+      final SerializableSupplier<Serde<V>> kafkaValueSerde,
+      final TypeInformation<Tuple2<K, V>> typeInformation) {
+    return this.createProducer(
+        topic,
+        new FlinkKafkaKeyValueSerde<>(
+            topic,
+            kafkaKeySerde,
+            kafkaValueSerde,
+            typeInformation));
+  }
+
+  private <T> FlinkKafkaProducer<T> createBaseProducer(final FlinkKafkaProducer<T> baseProducer) {
+    baseProducer.setWriteTimestampToKafka(true);
+    return baseProducer;
+  }
+
+  private Properties buildProducerProperties() {
+    final Properties producerProps = this.cloneProperties();
+    producerProps.setProperty(
+        ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+        String.valueOf(PRODUCER_TRANSACTION_TIMEOUT.toMillis())); // TODO necessary?
+    return producerProps;
+  }
+
+  private Properties cloneProperties() {
+    final Properties props = new Properties();
+    props.putAll(this.kafkaProps);
+    return props;
+  }
+
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
new file mode 100644
index 0000000000000000000000000000000000000000..a94927e4bf49e1dbe6d109eb8f19f7d292f3d879
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/StateBackends.java
@@ -0,0 +1,68 @@
+package theodolite.commons.flink;
+
+import java.io.IOException;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides factory methods for creating Flink {@link StateBackend}s.
+ */
+public final class StateBackends {
+
+  public static final String STATE_BACKEND_TYPE_MEMORY = "memory";
+  public static final String STATE_BACKEND_TYPE_FILESYSTEM = "filesystem";
+  public static final String STATE_BACKEND_TYPE_ROCKSDB = "rocksdb";
+  // public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_ROCKSDB;
+  public static final String STATE_BACKEND_TYPE_DEFAULT = STATE_BACKEND_TYPE_MEMORY;
+  public static final String DEFAULT_STATE_BACKEND_PATH = "file:///opt/flink/statebackend";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StateBackends.class);
+
+  private StateBackends() {}
+
+  /**
+   * Create a Flink {@link StateBackend} from a {@link Configuration} and the
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND},
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND_MEMORY_SIZE} and
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND_PATH} configuration keys. Possible options for the
+   * {@code ConfigurationKeys#FLINK_STATE_BACKEND} configuration are
+   * {@code #STATE_BACKEND_TYPE_ROCKSDB}, {@code #STATE_BACKEND_TYPE_FILESYSTEM} and
+   * {@code StateBackendFactory#STATE_BACKEND_TYPE_MEMORY}, where
+   * {@code StateBackendFactory#STATE_BACKEND_TYPE_ROCKSDB} is the default.
+   */
+  public static StateBackend fromConfiguration(final Configuration configuration) {
+    final String stateBackendType =
+        configuration.getString(ConfigurationKeys.FLINK_STATE_BACKEND, STATE_BACKEND_TYPE_DEFAULT);
+    switch (stateBackendType) {
+      case STATE_BACKEND_TYPE_MEMORY:
+        final int memoryStateBackendSize = configuration.getInt(
+            ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
+            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+        return new MemoryStateBackend(memoryStateBackendSize);
+      case STATE_BACKEND_TYPE_FILESYSTEM:
+        final String stateBackendPath = configuration.getString(
+            ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
+            DEFAULT_STATE_BACKEND_PATH);
+        return new FsStateBackend(stateBackendPath);
+      case STATE_BACKEND_TYPE_ROCKSDB:
+        final String stateBackendPath2 = configuration.getString(
+            ConfigurationKeys.FLINK_STATE_BACKEND_PATH,
+            DEFAULT_STATE_BACKEND_PATH);
+        try {
+          return new RocksDBStateBackend(stateBackendPath2, true);
+        } catch (final IOException e) {
+          LOGGER.error("Cannot create RocksDB state backend.", e);
+          throw new IllegalStateException(e);
+        }
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported state backend '" + stateBackendType + "' configured.");
+    }
+  }
+
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/TupleType.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/TupleType.java
new file mode 100644
index 0000000000000000000000000000000000000000..360331e4d1e4fdc47a24ac8ae995b7590301f7fd
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/TupleType.java
@@ -0,0 +1,22 @@
+package theodolite.commons.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Helper methods for creating {@link TypeInformation} for {@link Tuple}s. In contrast to
+ * {@code Types#TUPLE(TypeInformation...)}, these methods bring real type safety.
+ */
+public final class TupleType {
+
+  private TupleType() {}
+
+  public static <T1, T2> TypeInformation<Tuple2<T1, T2>> of(// NOPMD
+      final TypeInformation<T1> t0,
+      final TypeInformation<T2> t1) {
+    return Types.TUPLE(t0, t1);
+  }
+
+}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java
index a09cbd210f242ea63f6281172f4a21e2d22357fe..22f615a6af4caf575af57dbe9b7f989889c4095f 100644
--- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/FlinkKafkaKeyValueSerde.java
@@ -1,5 +1,6 @@
 package theodolite.commons.flink.serialization;
 
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
@@ -7,29 +8,35 @@ import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serde;
+import theodolite.commons.flink.util.SerializableSupplier;
 
-import javax.annotation.Nullable;
-
+/**
+ * A {@link KafkaSerializationSchema} and {@link KafkaDeserializationSchema} for an arbitrary
+ * key-value-pair in Kafka, mapped to/from a Flink {@link Tuple2}.
+ *
+ * @param <K> Type of the key.
+ * @param <V> Type of the value.
+ */
 public class FlinkKafkaKeyValueSerde<K, V>
-    implements KafkaDeserializationSchema<Tuple2<K, V>>,
-               KafkaSerializationSchema<Tuple2<K, V>> {
+    implements KafkaDeserializationSchema<Tuple2<K, V>>, KafkaSerializationSchema<Tuple2<K, V>> {
+
+  private static final long serialVersionUID = 2469569396501933443L; // NOPMD
 
-  private static final long serialVersionUID = 2469569396501933443L;
+  private final SerializableSupplier<Serde<K>> keySerdeSupplier;
+  private final SerializableSupplier<Serde<V>> valueSerdeSupplier;
+  private final String topic;
+  private final TypeInformation<Tuple2<K, V>> typeInfo;
 
   private transient Serde<K> keySerde;
   private transient Serde<V> valueSerde;
 
-  private SerializableSupplier<Serde<K>> keySerdeSupplier;
-  private SerializableSupplier<Serde<V>> valueSerdeSupplier;
-
-  private String topic;
-
-  private TypeInformation<Tuple2<K,V>> typeInfo;
-
+  /**
+   * Create a new {@link FlinkKafkaKeyValueSerde}.
+   */
   public FlinkKafkaKeyValueSerde(final String topic,
-                                 final SerializableSupplier<Serde<K>> keySerdeSupplier,
-                                 final SerializableSupplier<Serde<V>> valueSerdeSupplier,
-                                 final TypeInformation<Tuple2<K, V>> typeInfo) {
+      final SerializableSupplier<Serde<K>> keySerdeSupplier,
+      final SerializableSupplier<Serde<V>> valueSerdeSupplier,
+      final TypeInformation<Tuple2<K, V>> typeInfo) {
     this.topic = topic;
     this.typeInfo = typeInfo;
     this.keySerdeSupplier = keySerdeSupplier;
@@ -43,7 +50,7 @@ public class FlinkKafkaKeyValueSerde<K, V>
 
   @Override
   public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
-    ensureInitialized();
+    this.ensureInitialized();
     final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
     final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
     return new Tuple2<>(key, value);
@@ -55,8 +62,9 @@ public class FlinkKafkaKeyValueSerde<K, V>
   }
 
   @Override
-  public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) {
-    ensureInitialized();
+  public ProducerRecord<byte[], byte[]> serialize(final Tuple2<K, V> element,
+      @Nullable final Long timestamp) {
+    this.ensureInitialized();
     final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
     final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
     return new ProducerRecord<>(this.topic, key, value);
@@ -65,7 +73,8 @@ public class FlinkKafkaKeyValueSerde<K, V>
   private void ensureInitialized() {
     if (this.keySerde == null || this.valueSerde == null) {
       this.keySerde = this.keySerdeSupplier.get();
-      this.valueSerde = this.valueSerdeSupplier.get();;
+      this.valueSerde = this.valueSerdeSupplier.get();
     }
   }
+
 }
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java
deleted file mode 100644
index 1f535c74697507f06c97d97b1b86c1086ec1491d..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/SerializableSupplier.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package theodolite.commons.flink.serialization;
-
-import java.io.Serializable;
-import java.util.function.Supplier;
-
-public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
-  // here be dragons
-}
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
index 53d25dced2aa3b1736ace1c38c6a75bf0f34e24a..f1f9870fda73ccec0fc25c5c70665759ab07d893 100644
--- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
@@ -9,11 +9,11 @@ import com.google.common.math.Stats;
 import java.io.Serializable;
 
 /**
- * Custom Kryo Serializer for efficient transmission between Flink instances.
+ * Custom Kryo {@link Serializer} for efficient transmission between Flink instances.
  */
 public class StatsSerializer extends Serializer<Stats> implements Serializable {
 
-  private static final long serialVersionUID = -1276866176534267373L;
+  private static final long serialVersionUID = -1276866176534267373L; //NOPMD
 
   @Override
   public void write(final Kryo kryo, final Output output, final Stats object) {
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java
new file mode 100644
index 0000000000000000000000000000000000000000..bcc51a9ef7b8bb0f36398ea401f1d2c898472081
--- /dev/null
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/util/SerializableSupplier.java
@@ -0,0 +1,13 @@
+package theodolite.commons.flink.util;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Interface for {@link Supplier}s which are serializable.
+ *
+ * @param <T> the type of results supplied by this supplier
+ */
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable { // NOPMD
+  // Nothing to do here
+}
diff --git a/theodolite-benchmarks/uc1-application-flink/Dockerfile b/theodolite-benchmarks/uc1-application-flink/Dockerfile
index cde0dc7dd9c864d4ad301dadb8d664dad1440aac..b66d5bc052693fff17d79bc715322a076589d6a3 100644
--- a/theodolite-benchmarks/uc1-application-flink/Dockerfile
+++ b/theodolite-benchmarks/uc1-application-flink/Dockerfile
@@ -1,3 +1,3 @@
-FROM nicobiernat/flink:1.11-scala_2.12-java_11
+FROM flink:1.12-scala_2.12-java11
 
-ADD build/libs/uc1-application-all.jar /opt/flink/usrlib/artifacts/uc1-application-all.jar
+ADD build/libs/uc1-application-flink-all.jar /opt/flink/usrlib/artifacts/uc1-application-flink-all.jar
diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..831db7fe63be6529e6b7ba299dca92b138ff7d13
--- /dev/null
+++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/GsonMapper.java
@@ -0,0 +1,22 @@
+package theodolite.uc1.application;
+
+import com.google.gson.Gson;
+import org.apache.flink.api.common.functions.MapFunction;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
+ * strings.
+ */
+public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
+
+  private static final long serialVersionUID = -5263671231838353747L; // NOPMD
+
+  private static final Gson GSON = new Gson();
+
+  @Override
+  public String map(final ActivePowerRecord value) throws Exception {
+    return GSON.toJson(value);
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
index 4778acde357653d07a33f43b4ff249b0d20233ad..6655b52ec3020f46bb8a37c7124ee870fa663573 100644
--- a/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc1-application-flink/src/main/java/theodolite/uc1/application/HistoryServiceFlinkJob.java
@@ -1,69 +1,75 @@
 package theodolite.uc1.application;
 
-import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.flink.KafkaConnectorFactory;
 import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * The History microservice implemented as a Flink job.
  */
-public class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
   private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final StreamExecutionEnvironment env;
+  private final String applicationId;
 
-  private void run() {
+  /**
+   * Create a new instance of the {@link HistoryServiceFlinkJob}.
+   */
+  public HistoryServiceFlinkJob() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
     final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    final String applicationId = applicationName + "-" + applicationVersion;
+    this.applicationId = applicationName + "-" + applicationVersion;
+
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    this.configureEnv();
+
+    this.buildPipeline();
+  }
+
+  private void configureEnv() {
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
     final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    if (checkpointing) {
+      this.env.enableCheckpointing(commitIntervalMs);
+    }
+  }
+
+  private void buildPipeline() {
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
-    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
-    final Properties kafkaProps = new Properties();
-    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
-    kafkaProps.setProperty("group.id", applicationId);
-
-    final DeserializationSchema<ActivePowerRecord> serde =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(
-            ActivePowerRecord.class,
-            schemaRegistryUrl);
+    final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
+        this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
 
     final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
-        new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
-    kafkaConsumer.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
-    }
+        kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
 
-    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    if (checkpointing) {
-      env.enableCheckpointing(commitIntervalMs);
-    }
-
-    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
+    final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
 
     stream
         .rebalance()
-        .map(v -> "ActivePowerRecord { "
-            + "identifier: " + v.getIdentifier() + ", "
-            + "timestamp: " + v.getTimestamp() + ", "
-            + "valueInW: " + v.getValueInW() + " }")
-        .print();
+        .map(new GsonMapper())
+        .flatMap((record, c) -> LOGGER.info("Record: {}", record));
+  }
 
+  /**
+   * Start running this microservice.
+   */
+  public void run() {
     try {
-      env.execute(applicationId);
+      this.env.execute(this.applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
       LOGGER.error("An error occured while running this job.", e);
     }
diff --git a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
index 75c833aa722654395b1adc6f739395eea5256820..427a838f45f6807ede00dcb68ebf8c5580f28ce6 100644
--- a/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java
@@ -17,11 +17,11 @@ import titan.ccp.model.records.ActivePowerRecord;
 public class TopologyBuilder {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
+  private static final Gson GSON = new Gson();
 
   private final String inputTopic;
   private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
 
-  private final Gson gson = new Gson();
   private final StreamsBuilder builder = new StreamsBuilder();
 
 
@@ -42,8 +42,8 @@ public class TopologyBuilder {
         .stream(this.inputTopic, Consumed.with(
             Serdes.String(),
             this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
-        .mapValues(v -> this.gson.toJson(v))
-        .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
+        .mapValues(v -> GSON.toJson(v))
+        .foreach((k, record) -> LOGGER.info("Record: {}", record));
 
     return this.builder.build(properties);
   }
diff --git a/theodolite-benchmarks/uc1-application/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-application/src/main/resources/META-INF/application.properties
index b46e6246e248cc524c5b6249348c76ded6ec468b..e3371cc87e20e85e6e8c327955537e6e49dab86e 100644
--- a/theodolite-benchmarks/uc1-application/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc1-application/src/main/resources/META-INF/application.properties
@@ -4,5 +4,5 @@ application.version=0.0.1
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 
-schema.registry.url=http://localhost:8091
+schema.registry.url=http://localhost:8081
 
diff --git a/theodolite-benchmarks/uc2-application-flink/Dockerfile b/theodolite-benchmarks/uc2-application-flink/Dockerfile
index 8c2852fb2e62f9d15cdd3fadb1252eef3d2732b0..fe7e7b75d77488a55a01e8d2a890ddd69cab76e3 100644
--- a/theodolite-benchmarks/uc2-application-flink/Dockerfile
+++ b/theodolite-benchmarks/uc2-application-flink/Dockerfile
@@ -1,3 +1,3 @@
-FROM nicobiernat/flink:1.11-scala_2.12-java_11
+FROM flink:1.12-scala_2.12-java11
 
-ADD build/libs/uc2-application-all.jar /opt/flink/usrlib/artifacts/uc2-application-all.jar
\ No newline at end of file
+ADD build/libs/uc2-application-flink-all.jar /opt/flink/usrlib/artifacts/uc2-application-flink-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
index 714f567c763dc8a5348aae258652371ce3da475f..b8452847df800226ad481f9309323a2a9a532939 100644
--- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/HistoryServiceFlinkJob.java
@@ -1,32 +1,21 @@
 package theodolite.uc2.application;
 
 import com.google.common.math.Stats;
-import java.io.IOException;
-import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.KafkaConnectorFactory;
+import theodolite.commons.flink.StateBackends;
 import theodolite.commons.flink.serialization.StatsSerializer;
 import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
@@ -35,108 +24,97 @@ import titan.ccp.model.records.ActivePowerRecord;
 /**
  * The History microservice implemented as a Flink job.
  */
-public class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
   private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final StreamExecutionEnvironment env;
+  private final String applicationId;
 
-  private void run() {
+  /**
+   * Create a new instance of the {@link HistoryServiceFlinkJob}.
+   */
+  public HistoryServiceFlinkJob() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
     final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    final String applicationId = applicationName + "-" + applicationVersion;
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
-    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
-    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
-    final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+    this.applicationId = applicationName + "-" + applicationVersion;
 
-    final Properties kafkaProps = new Properties();
-    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
-    kafkaProps.setProperty("group.id", applicationId);
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-    final DeserializationSchema<ActivePowerRecord> sourceSerde =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(
-            ActivePowerRecord.class,
-            schemaRegistryUrl);
+    this.configureEnv();
 
-    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
-        inputTopic, sourceSerde, kafkaProps);
-
-    kafkaSource.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaSource.setCommitOffsetsOnCheckpoints(true);
-    }
-    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
-
-    final KafkaSerializationSchema<Tuple2<String, String>> sinkSerde =
-        new FlinkKafkaKeyValueSerde<>(outputTopic,
-            Serdes::String,
-            Serdes::String,
-            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
-    kafkaProps.setProperty("transaction.timeout.ms", "" + 5 * 60 * 1000);
-    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
-        outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-    kafkaSink.setWriteTimestampToKafka(true);
+    this.buildPipeline();
+  }
 
-    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  private void configureEnv() {
+    this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
     if (checkpointing) {
-      env.enableCheckpointing(commitIntervalMs);
+      this.env.enableCheckpointing(commitIntervalMs);
     }
 
     // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        LOGGER.error("Cannot create RocksDB state backend.", e);
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
+    this.env.setStateBackend(stateBackend);
 
-    env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+    this.configureSerializers();
+  }
 
-    env.getConfig().getRegisteredTypesWithKryoSerializers()
+  private void configureSerializers() {
+    this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+    this.env.getConfig().getRegisteredTypesWithKryoSerializers()
         .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
             + s.getSerializer().getClass().getName()));
 
-    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
-        .name("[Kafka Consumer] Topic: " + inputTopic);
+  }
+
+  private void buildPipeline() {
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+
+    final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
+        this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
+
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
+        kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
+
+    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink =
+        kafkaConnector.createProducer(outputTopic,
+            Serdes::String,
+            Serdes::String,
+            Types.TUPLE(Types.STRING, Types.STRING));
 
-    stream
+    this.env
+        .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
         .rebalance()
-        .keyBy((KeySelector<ActivePowerRecord, String>) ActivePowerRecord::getIdentifier)
+        .keyBy(ActivePowerRecord::getIdentifier)
         .window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
         .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
-        .map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() {
-          @Override
-          public Tuple2<String, String> map(final Tuple2<String, Stats> t) {
-            final String key = t.f0;
-            final String value = t.f1.toString();
-            LOGGER.info("{}: {}", key, value);
-            return new Tuple2<>(key, value);
-          }
-        }).name("map")
+        .map(t -> {
+          final String key = t.f0;
+          final String value = t.f1.toString();
+          LOGGER.info("{}: {}", key, value);
+          return new Tuple2<>(key, value);
+        }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
+  }
+
 
-    LOGGER.info("Execution plan: {}", env.getExecutionPlan());
+  /**
+   * Start running this microservice.
+   */
+  public void run() {
+    LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
 
     try {
-      env.execute(applicationId);
+      this.env.execute(this.applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
       LOGGER.error("An error occured while running this job.", e);
     }
diff --git a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java
index a5c370eeda3c0371a4cf479437774050abef544b..d422c37b667d9d3309f0dd858758db29051807b9 100644
--- a/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java
+++ b/theodolite-benchmarks/uc2-application-flink/src/main/java/theodolite/uc2/application/StatsProcessWindowFunction.java
@@ -6,6 +6,10 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * A {@link ProcessWindowFunction} that forwards a computed {@link Stats} object along with its
+ * associated key.
+ */
 public class StatsProcessWindowFunction
     extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> {
 
diff --git a/theodolite-benchmarks/uc2-application/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-application/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..4d01df75552c562406705858b6368ecf59d6e82f 100644
--- a/theodolite-benchmarks/uc2-application/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc2-application/.settings/org.eclipse.jdt.ui.prefs
@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
 org.eclipse.jdt.ui.staticondemandthreshold=99
+org.eclipse.jdt.ui.text.custom_code_templates=
 sp_cleanup.add_default_serial_version_id=true
 sp_cleanup.add_generated_serial_version_id=false
 sp_cleanup.add_missing_annotations=true
diff --git a/theodolite-benchmarks/uc2-application/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-application/src/main/resources/META-INF/application.properties
index 15293b1387b96688401bbc48bc2d1615c7b63aba..1b59528db59653d8dc0c2a04d242a0cd39fe07da 100644
--- a/theodolite-benchmarks/uc2-application/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc2-application/src/main/resources/META-INF/application.properties
@@ -6,4 +6,4 @@ kafka.input.topic=input
 kafka.output.topic=output
 kafka.window.duration.minutes=1
 
-schema.registry.url=http://localhost:8091
+schema.registry.url=http://localhost:8081
diff --git a/theodolite-benchmarks/uc2-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-workload-generator/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..4d01df75552c562406705858b6368ecf59d6e82f 100644
--- a/theodolite-benchmarks/uc2-workload-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc2-workload-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
 org.eclipse.jdt.ui.staticondemandthreshold=99
+org.eclipse.jdt.ui.text.custom_code_templates=
 sp_cleanup.add_default_serial_version_id=true
 sp_cleanup.add_generated_serial_version_id=false
 sp_cleanup.add_missing_annotations=true
diff --git a/theodolite-benchmarks/uc3-application-flink/Dockerfile b/theodolite-benchmarks/uc3-application-flink/Dockerfile
index 541033dfbf1db97d71963cfc5ec99f8efa300933..d582cd63fbc9cc5c5e540170bc7bc0aa2adc0ab1 100644
--- a/theodolite-benchmarks/uc3-application-flink/Dockerfile
+++ b/theodolite-benchmarks/uc3-application-flink/Dockerfile
@@ -1,3 +1,3 @@
-FROM nicobiernat/flink:1.11-scala_2.12-java_11
+FROM flink:1.12-scala_2.12-java11
 
-ADD build/libs/uc3-application-all.jar /opt/flink/usrlib/artifacts/uc3-application-all.jar
\ No newline at end of file
+ADD build/libs/uc3-application-flink-all.jar /opt/flink/usrlib/artifacts/uc3-application-flink-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
index 18e54ab6e5857a76ccf70384e17241debe7e3b2b..0f26d37652924a16be1840fd759b3cd5b023f338 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HistoryServiceFlinkJob.java
@@ -1,25 +1,15 @@
 package theodolite.uc3.application;
 
 import com.google.common.math.Stats;
-import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.Properties;
 import org.apache.commons.configuration2.Configuration;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -28,7 +18,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.KafkaConnectorFactory;
+import theodolite.commons.flink.StateBackends;
 import theodolite.commons.flink.serialization.StatsSerializer;
 import theodolite.uc3.application.util.HourOfDayKey;
 import theodolite.uc3.application.util.HourOfDayKeyFactory;
@@ -37,110 +28,90 @@ import theodolite.uc3.application.util.StatsKeyFactory;
 import titan.ccp.common.configuration.ServiceConfigurations;
 import titan.ccp.model.records.ActivePowerRecord;
 
-
 /**
  * The History microservice implemented as a Flink job.
  */
-public class HistoryServiceFlinkJob {
+public final class HistoryServiceFlinkJob {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
 
   private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final StreamExecutionEnvironment env;
+  private final String applicationId;
 
-  private void run() {
-    // Configurations
+  /**
+   * Create a new instance of the {@link HistoryServiceFlinkJob}.
+   */
+  public HistoryServiceFlinkJob() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
     final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    final String applicationId = applicationName + "-" + applicationVersion;
-    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
-    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
-    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
-    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
-    final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE);
-    final ZoneId timeZone = ZoneId.of(timeZoneString);
-    final Time aggregationDuration =
-        Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
-    final Time aggregationAdvance =
-        Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
-    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+    this.applicationId = applicationName + "-" + applicationVersion;
 
-    final Properties kafkaProps = new Properties();
-    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
-    kafkaProps.setProperty("group.id", applicationId);
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-    // Sources and Sinks with Serializer and Deserializer
+    this.configureEnv();
 
-    final DeserializationSchema<ActivePowerRecord> sourceSerde =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(
-            ActivePowerRecord.class,
-            schemaRegistryUrl);
-
-    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
-        inputTopic, sourceSerde, kafkaProps);
-
-    kafkaSource.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaSource.setCommitOffsetsOnCheckpoints(true);
-    }
-    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
-
-    final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
-        new FlinkKafkaKeyValueSerde<>(outputTopic,
-            Serdes::String,
-            Serdes::String,
-            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
-
-    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
-        outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-    kafkaSink.setWriteTimestampToKafka(true);
-
-    // Execution environment configuration
+    this.buildPipeline();
+  }
 
-    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  private void configureEnv() {
+    this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
+    final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
     if (checkpointing) {
-      env.enableCheckpointing(commitIntervalMs);
+      this.env.enableCheckpointing(commitIntervalMs);
     }
 
     // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        LOGGER.error("Cannot create RocksDB state backend.", e);
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
+    this.env.setStateBackend(stateBackend);
+
+    this.configureSerializers();
+  }
 
-    // Kryo serializer registration
-    env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
-    env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
-    for (final var entry : env.getConfig().getRegisteredTypesWithKryoSerializers().entrySet()) {
+  private void configureSerializers() {
+    this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
+        new HourOfDayKeySerde());
+    this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
+    for (final var entry : this.env.getConfig().getRegisteredTypesWithKryoSerializers()
+        .entrySet()) {
       LOGGER.info("Class {} registered with serializer {}.",
           entry.getKey().getName(),
           entry.getValue().getSerializer().getClass().getName());
     }
+  }
 
-    // Streaming topology
+  private void buildPipeline() {
+    // Configurations
+    final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
+    final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE));
+    final Time aggregationDuration =
+        Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
+    final Time aggregationAdvance =
+        Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
-    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+    final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
+        this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
 
-    final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
-        .name("[Kafka Consumer] Topic: " + inputTopic);
+    // Sources and Sinks
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
+        kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
+    final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink =
+        kafkaConnector.createProducer(outputTopic,
+            Serdes::String,
+            Serdes::String,
+            Types.TUPLE(Types.STRING, Types.STRING));
 
-    stream
+    // Streaming topology
+    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
+    this.env
+        .addSource(kafkaSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic)
         .rebalance()
         .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
           final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
@@ -149,25 +120,28 @@ public class HistoryServiceFlinkJob {
         })
         .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
         .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
-        .map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() {
-          @Override
-          public Tuple2<String, String> map(final Tuple2<HourOfDayKey, Stats> tuple) {
-            final String newKey = keyFactory.getSensorId(tuple.f0);
-            final String newValue = tuple.f1.toString();
-            final int hourOfDay = tuple.f0.getHourOfDay();
-            LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
-            return new Tuple2<>(newKey, newValue);
-          }
-        }).name("map")
+        .map(tuple -> {
+          final String newKey = keyFactory.getSensorId(tuple.f0);
+          final String newValue = tuple.f1.toString();
+          final int hourOfDay = tuple.f0.getHourOfDay();
+          LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
+          return new Tuple2<>(newKey, newValue);
+        })
+        .name("map")
+        .returns(Types.TUPLE(Types.STRING, Types.STRING))
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
+  }
 
+  /**
+   * Start running this microservice.
+   */
+  public void run() {
     // Execution plan
-    LOGGER.info("Execution Plan: {}", env.getExecutionPlan());
+    LOGGER.info("Execution Plan: {}", this.env.getExecutionPlan());
 
     // Execute Job
-
     try {
-      env.execute(applicationId);
+      this.env.execute(this.applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
       LOGGER.error("An error occured while running this job.", e);
     }
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java
index 389b0e4a22966995731988f5010ed3ef7e8b209d..349c63413d0da792ad34e8ec8d94e7ff5dc06a42 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/HourOfDayProcessWindowFunction.java
@@ -7,6 +7,10 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 import theodolite.uc3.application.util.HourOfDayKey;
 
+/**
+ * A {@link ProcessWindowFunction} that forwards a computed {@link Stats} object along with its
+ * associated key.
+ */
 public class HourOfDayProcessWindowFunction
     extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> {
 
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java
index c75bc54e023f90ea5d3487c9b7ecca8cefac27b3..5def88b404f23a59955ca2de42b91c22b7b1b53d 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKey.java
@@ -47,6 +47,10 @@ public class HourOfDayKey {
     return this.hourOfDay == k.hourOfDay && this.sensorId.equals(k.sensorId);
   }
 
+  /**
+   * Convert this {@link HourOfDayKey} into a byte array. This method is the inverse to
+   * {@code HourOfDayKey#fromByteArray()}.
+   */
   public byte[] toByteArray() {
     final int numBytes = (2 * Integer.SIZE + this.sensorId.length() * Character.SIZE) / Byte.SIZE;
     final ByteBuffer buffer = ByteBuffer.allocate(numBytes).order(ByteOrder.LITTLE_ENDIAN);
@@ -58,6 +62,10 @@ public class HourOfDayKey {
     return buffer.array();
   }
 
+  /**
+   * Construct a new {@link HourOfDayKey} from a byte array. This method is the inverse to
+   * {@code HourOfDayKey#toByteArray()}.
+   */
   public static HourOfDayKey fromByteArray(final byte[] bytes) {
     final ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
     final int hourOfDay = buffer.getInt();
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java
index ffc3129bd070e8df9711111f671660efecc16650..bd67b2508bc91a87635c52e95b963ed908ed92bf 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeyFactory.java
@@ -8,6 +8,8 @@ import java.time.LocalDateTime;
  */
 public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
 
+  private static final long serialVersionUID = 4357668496473645043L; // NOPMD
+
   @Override
   public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
     final int hourOfDay = dateTime.getHour();
diff --git a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java
index b5ab5b986cf11782e465e712334649102303e781..6e3ae9f754d2b1d4ab10349040f0c9e51134c4f7 100644
--- a/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java
+++ b/theodolite-benchmarks/uc3-application-flink/src/main/java/theodolite/uc3/application/util/HourOfDayKeySerde.java
@@ -18,6 +18,8 @@ import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
 public class HourOfDayKeySerde extends Serializer<HourOfDayKey>
     implements BufferSerde<HourOfDayKey>, Serializable {
 
+  private static final long serialVersionUID = 1262778284661945041L; // NOPMD
+
   @Override
   public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
     buffer.putInt(data.getHourOfDay());
diff --git a/theodolite-benchmarks/uc3-application/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-application/src/main/resources/META-INF/application.properties
index 1273441a61763325c812541e1af8c243f81a31a5..0ce745fb61f87016aee5cc242c03069924ceb58e 100644
--- a/theodolite-benchmarks/uc3-application/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc3-application/src/main/resources/META-INF/application.properties
@@ -7,4 +7,4 @@ kafka.output.topic=output
 aggregation.duration.days=30
 aggregation.advance.days=1
 
-schema.registry.url=http://localhost:8091
+schema.registry.url=http://localhost:8081
diff --git a/theodolite-benchmarks/uc4-application-flink/Dockerfile b/theodolite-benchmarks/uc4-application-flink/Dockerfile
index 59ec78d72275bfb6ed04c73eec78e81192f703dc..49521e7d1e96a7e7a295a1557e205c0e7d4316db 100644
--- a/theodolite-benchmarks/uc4-application-flink/Dockerfile
+++ b/theodolite-benchmarks/uc4-application-flink/Dockerfile
@@ -1,3 +1,3 @@
-FROM nicobiernat/flink:1.11-scala_2.12-java_11
+FROM flink:1.12-scala_2.12-java11
 
-ADD build/libs/uc4-application-all.jar /opt/flink/usrlib/artifacts/uc4-application-all.jar
\ No newline at end of file
+ADD build/libs/uc4-application-flink-all.jar /opt/flink/usrlib/artifacts/uc4-application-flink-all.jar
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
index f720b1ab557a11f8e5a20a36bc2b660588454e0d..0db5a3d524f74fbf22304e8f9b44fa55eead321a 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/AggregationServiceFlinkJob.java
@@ -1,21 +1,14 @@
-package theodolite.uc4.application;
+package theodolite.uc4.application; // NOPMD Imports required
 
-import java.io.IOException;
 import java.time.Duration;
-import java.util.Properties;
 import java.util.Set;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,7 +19,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
+import theodolite.commons.flink.KafkaConnectorFactory;
+import theodolite.commons.flink.StateBackends;
+import theodolite.commons.flink.TupleType;
 import theodolite.uc4.application.util.ImmutableSensorRegistrySerializer;
 import theodolite.uc4.application.util.ImmutableSetSerializer;
 import theodolite.uc4.application.util.SensorParentKey;
@@ -43,178 +38,136 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
 /**
  * The Aggregation microservice implemented as a Flink job.
  */
-public class AggregationServiceFlinkJob {
+public final class AggregationServiceFlinkJob {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
 
   private final Configuration config = ServiceConfigurations.createWithDefaults();
+  private final StreamExecutionEnvironment env;
+  private final String applicationId;
 
-  private void run() {
-    // Configurations
+  /**
+   * Create a new {@link AggregationServiceFlinkJob}.
+   */
+  public AggregationServiceFlinkJob() {
     final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
     final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
-    final String applicationId = applicationName + "-" + applicationVersion;
+    this.applicationId = applicationName + "-" + applicationVersion;
+
+    // Execution environment configuration
+    // org.apache.flink.configuration.Configuration conf = new
+    // org.apache.flink.configuration.Configuration();
+    // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+    // final StreamExecutionEnvironment env =
+    // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    this.configureEnv();
+
+    this.buildPipeline();
+  }
+
+  private void configureEnv() {
+    this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
     final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
+    if (checkpointing) {
+      this.env.enableCheckpointing(commitIntervalMs);
+    }
+
+    // State Backend
+    final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
+    this.env.setStateBackend(stateBackend);
+
+    this.configureSerializers();
+  }
+
+  private void configureSerializers() {
+    this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
+        new ImmutableSensorRegistrySerializer());
+    this.env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
+        new SensorParentKeySerializer());
+
+    this.env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
+        new ImmutableSetSerializer());
+    this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
+        new ImmutableSetSerializer());
+    this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
+        new ImmutableSetSerializer());
+
+    this.env.getConfig().getRegisteredTypesWithKryoSerializers()
+        .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+            + s.getSerializer().getClass().getName()));
+  }
+
+  private void buildPipeline() {
+    // Get configurations
     final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
     final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
-    final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
     final Time windowSize =
         Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
     final Duration windowGrace =
         Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
     final String configurationTopic =
         this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
-    final String stateBackend =
-        this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
-    final String stateBackendPath = this.config
-        .getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
-    final int memoryStateBackendSize =
-        this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE,
-            MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
-    final boolean debug = this.config.getBoolean(ConfigurationKeys.DEBUG, true);
     final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
 
-    final Properties kafkaProps = new Properties();
-    kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
-    kafkaProps.setProperty("group.id", applicationId);
-
-    // Sources and Sinks with Serializer and Deserializer
+    final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
+        this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
 
     // Source from input topic with ActivePowerRecords
-    final DeserializationSchema<ActivePowerRecord> inputSerde =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(
-            ActivePowerRecord.class,
-            schemaRegistryUrl);
-
-    final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource = new FlinkKafkaConsumer<>(
-        inputTopic, inputSerde, kafkaProps);
-
-    kafkaInputSource.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaInputSource.setCommitOffsetsOnCheckpoints(true);
-    }
+    final FlinkKafkaConsumer<ActivePowerRecord> kafkaInputSource =
+        kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
+    // TODO Watermarks?
 
     // Source from output topic with AggregatedPowerRecords
-    final DeserializationSchema<AggregatedActivePowerRecord> outputSerde =
-        ConfluentRegistryAvroDeserializationSchema.forSpecific(
-            AggregatedActivePowerRecord.class,
-            schemaRegistryUrl);
-
     final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
-        new FlinkKafkaConsumer<>(
-            outputTopic, outputSerde, kafkaProps);
+        kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class);
 
-    kafkaOutputSource.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaOutputSource.setCommitOffsetsOnCheckpoints(true);
-    }
-
-    // Source from configuration topic with EventSensorRegistry JSON
-    final FlinkKafkaKeyValueSerde<Event, String> configSerde =
-        new FlinkKafkaKeyValueSerde<>(
+    final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource =
+        kafkaConnector.createConsumer(
             configurationTopic,
             EventSerde::serde,
             Serdes::String,
-            TypeInformation.of(new TypeHint<Tuple2<Event, String>>() {}));
-
-    final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = new FlinkKafkaConsumer<>(
-        configurationTopic, configSerde, kafkaProps);
-    kafkaConfigSource.setStartFromGroupOffsets();
-    if (checkpointing) {
-      kafkaConfigSource.setCommitOffsetsOnCheckpoints(true);
-    }
+            TupleType.of(TypeInformation.of(Event.class), Types.STRING));
 
     // Sink to output topic with SensorId, AggregatedActivePowerRecord
-    final FlinkKafkaKeyValueSerde<String, AggregatedActivePowerRecord> aggregationSerde =
-        new FlinkKafkaKeyValueSerde<>(
+    final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
+        kafkaConnector.createProducer(
             outputTopic,
             Serdes::String,
             () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(),
-            TypeInformation.of(new TypeHint<Tuple2<String, AggregatedActivePowerRecord>>() {}));
-
-    final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
-        new FlinkKafkaProducer<>(
-            outputTopic,
-            aggregationSerde,
-            kafkaProps,
-            FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-    kafkaAggregationSink.setWriteTimestampToKafka(true);
-
-    // Execution environment configuration
-    // org.apache.flink.configuration.Configuration conf = new
-    // org.apache.flink.configuration.Configuration();
-    // conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-    // final StreamExecutionEnvironment env =
-    // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
-    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-    if (checkpointing) {
-      env.enableCheckpointing(commitIntervalMs);
-    }
-
-    // State Backend
-    if (stateBackend.equals("filesystem")) {
-      env.setStateBackend(new FsStateBackend(stateBackendPath));
-    } else if (stateBackend.equals("rocksdb")) {
-      try {
-        env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
-      } catch (final IOException e) {
-        e.printStackTrace();
-      }
-    } else {
-      env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
-    }
-
-    // Kryo serializer registration
-    env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
-        new ImmutableSensorRegistrySerializer());
-    env.getConfig().registerTypeWithKryoSerializer(SensorParentKey.class,
-        new SensorParentKeySerializer());
-
-    env.getConfig().registerTypeWithKryoSerializer(Set.of().getClass(),
-        new ImmutableSetSerializer());
-    env.getConfig().registerTypeWithKryoSerializer(Set.of(1).getClass(),
-        new ImmutableSetSerializer());
-    env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(),
-        new ImmutableSetSerializer());
-
-    env.getConfig().getRegisteredTypesWithKryoSerializers()
-        .forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
-            + s.getSerializer().getClass().getName()));
-
-    // Streaming topology
+            Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)));
 
     // Build input stream
-    final DataStream<ActivePowerRecord> inputStream = env.addSource(kafkaInputSource)
-        .name("[Kafka Consumer] Topic: " + inputTopic)
+    final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
+        .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
         .rebalance()
         .map(r -> r)
         .name("[Map] Rebalance Forward");
 
     // Build aggregation stream
-    final DataStream<ActivePowerRecord> aggregationsInputStream = env.addSource(kafkaOutputSource)
-        .name("[Kafka Consumer] Topic: " + outputTopic)
-        .rebalance()
-        .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
-        .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
+    final DataStream<ActivePowerRecord> aggregationsInputStream =
+        this.env.addSource(kafkaOutputSource)
+            .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
+            .rebalance()
+            .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
+            .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
 
     // Merge input and aggregation streams
     final DataStream<ActivePowerRecord> mergedInputStream = inputStream
         .union(aggregationsInputStream);
 
-    if (debug) {
-      mergedInputStream.print();
-    }
     // Build parent sensor stream from configuration stream
     final DataStream<Tuple2<String, Set<String>>> configurationsStream =
-        env.addSource(kafkaConfigSource)
-            .name("[Kafka Consumer] Topic: " + configurationTopic)
+        this.env.addSource(kafkaConfigSource)
+            .name("[Kafka Consumer] Topic: " + configurationTopic) // NOCS
             .filter(tuple -> tuple.f0 == Event.SENSOR_REGISTRY_CHANGED
                 || tuple.f0 == Event.SENSOR_REGISTRY_STATUS)
             .name("[Filter] SensorRegistry changed")
-            // Tuple2<Event, String> -> SensorRegistry
             .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry")
             .keyBy(sr -> 1)
             .flatMap(new ChildParentsFlatMapFunction())
@@ -227,31 +180,6 @@ public class AggregationServiceFlinkJob {
             .flatMap(new JoinAndDuplicateCoFlatMapFunction())
             .name("[CoFlatMap] Join input-config, Flatten to ((Sensor, Group), ActivePowerRecord)");
 
-    // KeyedStream<ActivePowerRecord, String> keyedStream =
-    // mergedInputStream.keyBy(ActivePowerRecord::getIdentifier);
-    //
-    // MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
-    // new MapStateDescriptor<>(
-    // "join-and-duplicate-state",
-    // BasicTypeInfo.STRING_TYPE_INFO,
-    // TypeInformation.of(new TypeHint<Set<String>>() {}));
-    //
-    // BroadcastStream<Tuple2<String, Set<String>>> broadcastStream =
-    // configurationsStream.keyBy(t -> t.f0).broadcast(sensorConfigStateDescriptor);
-    //
-    // DataStream<Tuple2<SensorParentKey, ActivePowerRecord>> lastValueStream =
-    // keyedStream.connect(broadcastStream)
-    // .process(new JoinAndDuplicateKeyedBroadcastProcessFunction());
-
-    if (debug) {
-      lastValueStream
-          .map(t -> "<" + t.f0.getSensor() + "|" + t.f0.getParent() + ">" + "ActivePowerRecord {"
-              + "identifier: " + t.f1.getIdentifier() + ", "
-              + "timestamp: " + t.f1.getTimestamp() + ", "
-              + "valueInW: " + t.f1.getValueInW() + " }")
-          .print();
-    }
-
     final DataStream<AggregatedActivePowerRecord> aggregationStream = lastValueStream
         .rebalance()
         .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
@@ -262,23 +190,22 @@ public class AggregationServiceFlinkJob {
 
     // add Kafka Sink
     aggregationStream
-        // AggregatedActivePowerRecord -> Tuple2<String, AggregatedActivePowerRecord>
         .map(value -> new Tuple2<>(value.getIdentifier(), value))
         .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)")
         .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class)))
         .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic);
+  }
 
-    // add stdout sink
-    if (debug) {
-      aggregationStream.print();
-    }
-
+  /**
+   * Start running this microservice.
+   */
+  public void run() {
     // Execution plan
-    LOGGER.info("Execution plan: {}", env.getExecutionPlan());
+    LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
 
     // Execute Job
     try {
-      env.execute(applicationId);
+      this.env.execute(this.applicationId);
     } catch (final Exception e) { // NOPMD Execution thrown by Flink
       LOGGER.error("An error occured while running this job.", e);
     }
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
index a7e5ac28ff08c17bbd7911c02cde7bee1316c823..910dc359fa9b5b0810f7f9b6e67bfceaa68cc798 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/ChildParentsFlatMapFunction.java
@@ -1,5 +1,10 @@
 package theodolite.uc4.application;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -12,36 +17,31 @@ import titan.ccp.model.sensorregistry.AggregatedSensor;
 import titan.ccp.model.sensorregistry.Sensor;
 import titan.ccp.model.sensorregistry.SensorRegistry;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 /**
  * Transforms a {@link SensorRegistry} into key value pairs of Sensor identifiers and their parents'
  * sensor identifiers. All pairs whose sensor's parents have changed since last iteration are
  * forwarded. A mapping of an identifier to <code>null</code> means that the corresponding sensor
  * does not longer exists in the sensor registry.
  */
-public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> {
+public class ChildParentsFlatMapFunction
+    extends RichFlatMapFunction<SensorRegistry, Tuple2<String, Set<String>>> {
 
-  private static final long serialVersionUID = 3969444219510915221L; //NOPMD
+  private static final long serialVersionUID = 3969444219510915221L; // NOPMD
 
   private transient MapState<String, Set<String>> state;
 
   @Override
-  public void open(Configuration parameters) {
-    MapStateDescriptor<String, Set<String>> descriptor =
-        new MapStateDescriptor<String, Set<String>>(
+  public void open(final Configuration parameters) {
+    final MapStateDescriptor<String, Set<String>> descriptor =
+        new MapStateDescriptor<>(
             "child-parents-state",
-            TypeInformation.of(new TypeHint<String>(){}),
-            TypeInformation.of(new TypeHint<Set<String>>(){}));
-    this.state = getRuntimeContext().getMapState(descriptor);
+            TypeInformation.of(new TypeHint<String>() {}),
+            TypeInformation.of(new TypeHint<Set<String>>() {}));
+    this.state = this.getRuntimeContext().getMapState(descriptor);
   }
 
   @Override
-  public void flatMap(SensorRegistry value, Collector<Tuple2<String, Set<String>>> out)
+  public void flatMap(final SensorRegistry value, final Collector<Tuple2<String, Set<String>>> out)
       throws Exception {
     final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(value);
     this.updateChildParentsPairs(childParentsPairs);
@@ -71,7 +71,7 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis
   }
 
   private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs)
-      throws Exception {
+      throws Exception { // NOPMD General exception thown by Flink
     final Iterator<Map.Entry<String, Set<String>>> oldChildParentsPairs = this.state.iterator();
     while (oldChildParentsPairs.hasNext()) {
       final Map.Entry<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next();
@@ -89,7 +89,8 @@ public class ChildParentsFlatMapFunction extends RichFlatMapFunction<SensorRegis
     }
   }
 
-  private void updateState(final Map<String, Set<String>> childParentsPairs) throws Exception {
+  private void updateState(final Map<String, Set<String>> childParentsPairs)
+      throws Exception { // NOPMD General exception thown by Flink
     for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) {
       if (childParentPair.getValue() == null) {
         this.state.remove(childParentPair.getKey());
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
index dec7b417b2683f95f363547fd4f76acf49195c4d..6ef9a72e9695cfccba0bbcca1238f7ebc94fc505 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateCoFlatMapFunction.java
@@ -12,6 +12,12 @@ import org.apache.flink.util.Collector;
 import theodolite.uc4.application.util.SensorParentKey;
 import titan.ccp.model.records.ActivePowerRecord;
 
+/**
+ * A {@link RichCoFlatMapFunction} which joins each incoming {@link ActivePowerRecord} with its
+ * corresponding parents. The {@link ActivePowerRecord} is duplicated for each parent. When
+ * receiving a new set of parents for a sensor, this operator updates its internal state and
+ * forwards "tombstone" record if a sensor does no longer have a certain parent.
+ */
 public class JoinAndDuplicateCoFlatMapFunction extends
     RichCoFlatMapFunction<ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS
 
@@ -44,17 +50,17 @@ public class JoinAndDuplicateCoFlatMapFunction extends
   @Override
   public void flatMap2(final Tuple2<String, Set<String>> value,
       final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
-    final Set<String> oldParents = this.state.get(value.f0);
-    if (oldParents != null) {
-      final Set<String> newParents = value.f1;
-      if (!newParents.equals(oldParents)) {
-        for (final String oldParent : oldParents) {
-          if (!newParents.contains(oldParent)) {
-            out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
-          }
+    final String sensor = value.f0;
+    final Set<String> oldParents = this.state.get(sensor);
+    final Set<String> newParents = value.f1;
+    if (oldParents != null && !newParents.equals(oldParents)) {
+      for (final String oldParent : oldParents) {
+        if (!newParents.contains(oldParent)) {
+          // Parent was deleted, emit tombstone record
+          out.collect(new Tuple2<>(new SensorParentKey(sensor, oldParent), null));
         }
       }
     }
-    this.state.put(value.f0, value.f1);
+    this.state.put(sensor, newParents);
   }
 }
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java
deleted file mode 100644
index 96711b2f09ad9fd6b0b2b3f98687acbf2e4c8c68..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/JoinAndDuplicateKeyedBroadcastProcessFunction.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package theodolite.uc4.application;
-
-import java.util.Set;
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
-import org.apache.flink.util.Collector;
-import theodolite.uc4.application.util.SensorParentKey;
-import titan.ccp.model.records.ActivePowerRecord;
-
-public class JoinAndDuplicateKeyedBroadcastProcessFunction extends
-    KeyedBroadcastProcessFunction<String, ActivePowerRecord, Tuple2<String, Set<String>>, Tuple2<SensorParentKey, ActivePowerRecord>> { // NOCS
-
-  private static final long serialVersionUID = -4525438547262992821L; // NOPMD
-
-  private final MapStateDescriptor<String, Set<String>> sensorConfigStateDescriptor =
-      new MapStateDescriptor<>(
-          "join-and-duplicate-state",
-          BasicTypeInfo.STRING_TYPE_INFO,
-          TypeInformation.of(new TypeHint<Set<String>>() {}));
-
-  @Override
-  public void processElement(final ActivePowerRecord value, final ReadOnlyContext ctx,
-      final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
-    final Set<String> parents =
-        ctx.getBroadcastState(this.sensorConfigStateDescriptor).get(value.getIdentifier());
-    if (parents == null) {
-      return;
-    }
-    for (final String parent : parents) {
-      out.collect(new Tuple2<>(new SensorParentKey(value.getIdentifier(), parent), value));
-    }
-  }
-
-  @Override
-  public void processBroadcastElement(final Tuple2<String, Set<String>> value, final Context ctx,
-      final Collector<Tuple2<SensorParentKey, ActivePowerRecord>> out) throws Exception {
-    final BroadcastState<String, Set<String>> state =
-        ctx.getBroadcastState(this.sensorConfigStateDescriptor);
-    final Set<String> oldParents = state.get(value.f0);
-    if (oldParents != null) {
-      final Set<String> newParents = value.f1;
-      if (!newParents.equals(oldParents)) {
-        for (final String oldParent : oldParents) {
-          if (!newParents.contains(oldParent)) {
-            out.collect(new Tuple2<>(new SensorParentKey(value.f0, oldParent), null));
-          }
-        }
-      }
-    }
-    state.put(value.f0, value.f1);
-  }
-
-}
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
index e4b545174861a70a49a0955f95b5bd7e14b2dfb6..45d4a09d153881572c949d2af7542f9cffb5622d 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/RecordAggregationProcessWindowFunction.java
@@ -15,6 +15,11 @@ import theodolite.uc4.application.util.SensorParentKey;
 import titan.ccp.model.records.ActivePowerRecord;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
+/**
+ * A {@link ProcessWindowFunction} which performs the windowed aggregation of all
+ * {@link ActivePowerRecord} for the same {@link SensorParentKey}. Result of this aggregation is an
+ * {@link AggregatedActivePowerRecord}.
+ */
 public class RecordAggregationProcessWindowFunction extends
     ProcessWindowFunction<Tuple2<SensorParentKey, ActivePowerRecord>, AggregatedActivePowerRecord, String, TimeWindow> { // NOCS
 
@@ -40,7 +45,9 @@ public class RecordAggregationProcessWindowFunction extends
   }
 
   @Override
-  public void process(final String key, final Context context,
+  public void process(
+      final String key,
+      final Context context,
       final Iterable<Tuple2<SensorParentKey, ActivePowerRecord>> elements,
       final Collector<AggregatedActivePowerRecord> out) throws Exception {
     for (final Tuple2<SensorParentKey, ActivePowerRecord> t : elements) {
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
index 927006fabd56ab6c24532cd71184e6f7c20094ac..e157f35c8a052d2d4a28526a0d98d56515d586d6 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSensorRegistrySerializer.java
@@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output;
 import java.io.Serializable;
 import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
 
+/**
+ * A {@link Serializer} for {@link ImmutableSensorRegistry}s.
+ */
 public class ImmutableSensorRegistrySerializer extends Serializer<ImmutableSensorRegistry>
     implements Serializable {
 
diff --git a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
index 913b07aaafcaad3b5247fd4bf13ec64df3469312..6b2dbcdfb403705b39815dd31112deab7947d83d 100644
--- a/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
+++ b/theodolite-benchmarks/uc4-application-flink/src/main/java/theodolite/uc4/application/util/ImmutableSetSerializer.java
@@ -7,6 +7,9 @@ import com.esotericsoftware.kryo.io.Output;
 import java.io.Serializable;
 import java.util.Set;
 
+/**
+ * A {@link Serializer} for serializing arbitrary {@link Set}s of {@link Object}s.
+ */
 public final class ImmutableSetSerializer extends Serializer<Set<Object>> implements Serializable {
 
   private static final long serialVersionUID = 6919877826110724620L; // NOPMD
diff --git a/theodolite-benchmarks/uc4-application/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-application/src/main/resources/META-INF/application.properties
index ce06091076e6ff7f9ede355c7f54c12b3d872119..a21f7e917e3ce4a0762261ca90444613c82ab650 100644
--- a/theodolite-benchmarks/uc4-application/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc4-application/src/main/resources/META-INF/application.properties
@@ -7,7 +7,7 @@ kafka.configuration.topic=configuration
 kafka.feedback.topic=aggregation-feedback
 kafka.output.topic=output
 
-schema.registry.url=http://localhost:8091
+schema.registry.url=http://localhost:8081
 
 emit.period.ms=5000
 grace.period.ms=0
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-workload-generator/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..4d01df75552c562406705858b6368ecf59d6e82f 100644
--- a/theodolite-benchmarks/uc4-workload-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc4-workload-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
 org.eclipse.jdt.ui.staticondemandthreshold=99
+org.eclipse.jdt.ui.text.custom_code_templates=
 sp_cleanup.add_default_serial_version_id=true
 sp_cleanup.add_generated_serial_version_id=false
 sp_cleanup.add_missing_annotations=true
diff --git a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
index dd17234bf1adb1f0fcf3ff3ab134a0743b917369..6e4a43271fbf1e0193c2d39569a0814d1f7935cd 100644
--- a/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
+++ b/theodolite-benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
@@ -6,6 +6,7 @@ import org.apache.avro.specific.SpecificRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,7 +117,13 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
             this.keyAccessor.apply(monitoringRecord), monitoringRecord);
 
     LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
+    try {
+      this.producer.send(record);
+    } catch (final SerializationException e) {
+      LOGGER.warn(
+          "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
+          e);
+    }
   }
 
   public void terminate() {