diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index daa2cd332cbedd388114f316492f6b4eaa93d307..9634c635c37dd93bfb0bb0f6ef9641cbf0432cb2 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -33,10 +33,16 @@ default:
   script:
     - mkdir -p /kaniko/.docker
     - echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
-    - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
-    - "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\""
-    - "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\""
-    - "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\""
+    - >
+      if [ $IMAGE_TAG ]; then
+        KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$IMAGE_TAG"
+      elif [ $CI_COMMIT_TAG ]; then
+        KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
+      else
+        DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
+        KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
+        KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
+      fi
     - "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\""
     - /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D
 
@@ -73,26 +79,37 @@ test-docs-links:
     - build-docs
   script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site
 
+build-docs-crds:
+  stage: build
+  image:
+    name: ghcr.io/fybrik/crdoc:0.6.1
+    entrypoint: [""]
+  script: /crdoc --resources theodolite/crd/ --template docs/api-reference/crds.tmpl --output docs/api-reference/crds.ref.md
+  artifacts:
+    paths:
+      - docs/api-reference/crds.ref.md
+    expire_in: 1 week
+  rules:
+    - changes:
+      - docs/api-reference/crds.tmpl
+      - theodolite/crd/**/*
+    - when: manual
+      allow_failure: true
+
 test-docs-crds-regression:
   stage: test
-  image: golang
+  needs:
+    - build-docs-crds
+  image: alpine:3.15
   before_script:
     - cd docs
-    - go install fybrik.io/crdoc@latest
   script:
-    - crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl  --output api-reference/crds.ref.md
     - cmp api-reference/crds.md api-reference/crds.ref.md
   artifacts:
     when: on_failure
     paths:
       - docs/api-reference/crds.ref.md
     expire_in: 1 week
-  rules:
-    - changes:
-      - docs/api-reference/crds.tmpl
-      - theodolite/crd/**/*
-    - when: manual
-      allow_failure: true
 
 
 # Theodolite Helm Chart
@@ -104,6 +121,11 @@ lint-helm:
     name: alpine/helm:3.5.2
     entrypoint: [""]
   script: helm lint helm/
+  rules:
+  - changes:
+    - helm/*
+  - when: manual
+    allow_failure: true
 
 
 # Theodolite Benchmarks
@@ -367,6 +389,11 @@ deploy-uc4-load-generator:
   before_script:
     - export GRADLE_USER_HOME=`pwd`/.gradle
     - cd theodolite
+  rules:
+    - changes:
+      - theodolite/**/*
+    - when: manual
+      allow_failure: true
 
 build-theodolite-jvm:
   stage: build
@@ -567,4 +594,22 @@ deploy-random-scheduler:
     - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
       when: manual
       allow_failure: true
-      
\ No newline at end of file
+
+deploy-buildimage-docker-compose-jq:
+  stage: deploy
+  extends:
+    - .kaniko-push
+  needs: []
+  variables:
+    DOCKER_VERSION: 20.10.12
+    IMAGE_NAME: theodolite-build-docker-compose-jq
+    IMAGE_TAG: $DOCKER_VERSION
+  before_script:
+    - cd buildimages/docker-compose-jq
+  rules:
+    - changes:
+      - buildimages/docker-compose-jq/Dockerfile
+      if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
+    - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_PIPELINE_SOURCE == 'web'"
+      when: manual
+      allow_failure: true
diff --git a/CITATION.cff b/CITATION.cff
index 07c2dcee319f73604f95414b987f8ed5274f7e82..04640de442f4458b09e11ce3d2939c850f594556 100644
--- a/CITATION.cff
+++ b/CITATION.cff
@@ -8,7 +8,7 @@ authors:
     given-names: Wilhelm
     orcid: "https://orcid.org/0000-0001-6625-4335"
 title: Theodolite
-version: "0.6.1"
+version: "0.6.3"
 repository-code: "https://github.com/cau-se/theodolite"
 license: "Apache-2.0"
 doi: "10.1016/j.bdr.2021.100209"
diff --git a/buildimages/docker-compose-jq/Dockerfile b/buildimages/docker-compose-jq/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..cd3f60ba3e75ab5767acff788c3bb69c8640cd4c
--- /dev/null
+++ b/buildimages/docker-compose-jq/Dockerfile
@@ -0,0 +1,6 @@
+FROM docker:${DOCKER_VERSION:-latest}
+
+RUN apk update && \
+    apk add jq && \
+    apk add py-pip python3-dev libffi-dev openssl-dev gcc libc-dev rust cargo make && \
+    pip install docker-compose
diff --git a/codemeta.json b/codemeta.json
index 2a190092b96adb3462c011e49db3c160d639d6fe..832b570681afb143978698fd47dad5d2835c700b 100644
--- a/codemeta.json
+++ b/codemeta.json
@@ -5,10 +5,10 @@
     "codeRepository": "https://github.com/cau-se/theodolite",
     "dateCreated": "2020-03-13",
     "datePublished": "2020-07-27",
-    "dateModified": "2022-01-17",
+    "dateModified": "2022-01-24",
     "downloadUrl": "https://github.com/cau-se/theodolite/releases",
     "name": "Theodolite",
-    "version": "0.6.1",
+    "version": "0.6.3",
     "description": "Theodolite is a framework for benchmarking the horizontal and vertical scalability of cloud-native applications.",
     "developmentStatus": "active",
     "relatedLink": [
diff --git a/docs/README.md b/docs/README.md
index 52b5311295e5a96721d9aa42f7e9c319da06960c..a19f94305dfdcb1de7c46da98afbb52b28a6bfa0 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl  --outpu
 With the following command, crdoc is executed in Docker:
 
 ```sh
-docker run --rm -v "`pwd`/../theodolite/crd/":/crd -u $UID -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.0 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
+docker run --rm -v "`pwd`/../theodolite/crd/":/crd -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.1 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
 ```
diff --git a/docs/api-reference/crds.md b/docs/api-reference/crds.md
index 0d7e46e3a72aea642fdc629f1abb664a4f8b93f3..fb3f02ac941870dd085d06027d972e6003c7aadb 100644
--- a/docs/api-reference/crds.md
+++ b/docs/api-reference/crds.md
@@ -94,13 +94,6 @@ Resource Types:
         </tr>
     </thead>
     <tbody><tr>
-        <td><b><a href="#benchmarkspeckafkaconfig">kafkaConfig</a></b></td>
-        <td>object</td>
-        <td>
-          Contains the Kafka configuration.<br/>
-        </td>
-        <td>true</td>
-      </tr><tr>
         <td><b><a href="#benchmarkspecloadgenerator">loadGenerator</a></b></td>
         <td>object</td>
         <td>
@@ -138,103 +131,20 @@ Resource Types:
         </td>
         <td>false</td>
       </tr><tr>
-        <td><b>name</b></td>
-        <td>string</td>
+        <td><b><a href="#benchmarkspeckafkaconfig">kafkaConfig</a></b></td>
+        <td>object</td>
         <td>
-          This field exists only for technical reasons and should not be set by the user. The value of the field will be overwritten.<br/>
-          <br/>
-            <i>Default</i>: <br/>
+          Contains the Kafka configuration.<br/>
         </td>
         <td>false</td>
-      </tr></tbody>
-</table>
-
-
-### benchmark.spec.kafkaConfig
-<sup><sup>[↩ Parent](#benchmarkspec)</sup></sup>
-
-
-
-Contains the Kafka configuration.
-
-<table>
-    <thead>
-        <tr>
-            <th>Name</th>
-            <th>Type</th>
-            <th>Description</th>
-            <th>Required</th>
-        </tr>
-    </thead>
-    <tbody><tr>
-        <td><b>bootstrapServer</b></td>
-        <td>string</td>
-        <td>
-          The bootstrap servers connection string.<br/>
-        </td>
-        <td>true</td>
       </tr><tr>
-        <td><b><a href="#benchmarkspeckafkaconfigtopicsindex">topics</a></b></td>
-        <td>[]object</td>
-        <td>
-          List of topics to be created for each experiment. Alternative theodolite offers the possibility to remove certain topics after each experiment.<br/>
-        </td>
-        <td>true</td>
-      </tr></tbody>
-</table>
-
-
-### benchmark.spec.kafkaConfig.topics[index]
-<sup><sup>[↩ Parent](#benchmarkspeckafkaconfig)</sup></sup>
-
-
-
-
-
-<table>
-    <thead>
-        <tr>
-            <th>Name</th>
-            <th>Type</th>
-            <th>Description</th>
-            <th>Required</th>
-        </tr>
-    </thead>
-    <tbody><tr>
         <td><b>name</b></td>
         <td>string</td>
         <td>
-          The name of the topic.<br/>
+          This field exists only for technical reasons and should not be set by the user. The value of the field will be overwritten.<br/>
           <br/>
             <i>Default</i>: <br/>
         </td>
-        <td>true</td>
-      </tr><tr>
-        <td><b>numPartitions</b></td>
-        <td>integer</td>
-        <td>
-          The number of partitions of the topic.<br/>
-          <br/>
-            <i>Default</i>: 0<br/>
-        </td>
-        <td>false</td>
-      </tr><tr>
-        <td><b>removeOnly</b></td>
-        <td>boolean</td>
-        <td>
-          Determines if this topic should only be deleted after each experiement. For removeOnly topics the name can be a RegEx describing the topic.<br/>
-          <br/>
-            <i>Default</i>: false<br/>
-        </td>
-        <td>false</td>
-      </tr><tr>
-        <td><b>replicationFactor</b></td>
-        <td>integer</td>
-        <td>
-          The replication factor of the topic.<br/>
-          <br/>
-            <i>Default</i>: 0<br/>
-        </td>
         <td>false</td>
       </tr></tbody>
 </table>
@@ -1647,6 +1557,96 @@ The fileSystem resourceSet loads the Kubernetes manifests from the filesystem.
 </table>
 
 
+### benchmark.spec.kafkaConfig
+<sup><sup>[↩ Parent](#benchmarkspec)</sup></sup>
+
+
+
+Contains the Kafka configuration.
+
+<table>
+    <thead>
+        <tr>
+            <th>Name</th>
+            <th>Type</th>
+            <th>Description</th>
+            <th>Required</th>
+        </tr>
+    </thead>
+    <tbody><tr>
+        <td><b>bootstrapServer</b></td>
+        <td>string</td>
+        <td>
+          The bootstrap servers connection string.<br/>
+        </td>
+        <td>true</td>
+      </tr><tr>
+        <td><b><a href="#benchmarkspeckafkaconfigtopicsindex">topics</a></b></td>
+        <td>[]object</td>
+        <td>
+          List of topics to be created for each experiment. Alternative theodolite offers the possibility to remove certain topics after each experiment.<br/>
+        </td>
+        <td>true</td>
+      </tr></tbody>
+</table>
+
+
+### benchmark.spec.kafkaConfig.topics[index]
+<sup><sup>[↩ Parent](#benchmarkspeckafkaconfig)</sup></sup>
+
+
+
+
+
+<table>
+    <thead>
+        <tr>
+            <th>Name</th>
+            <th>Type</th>
+            <th>Description</th>
+            <th>Required</th>
+        </tr>
+    </thead>
+    <tbody><tr>
+        <td><b>name</b></td>
+        <td>string</td>
+        <td>
+          The name of the topic.<br/>
+          <br/>
+            <i>Default</i>: <br/>
+        </td>
+        <td>true</td>
+      </tr><tr>
+        <td><b>numPartitions</b></td>
+        <td>integer</td>
+        <td>
+          The number of partitions of the topic.<br/>
+          <br/>
+            <i>Default</i>: 0<br/>
+        </td>
+        <td>false</td>
+      </tr><tr>
+        <td><b>removeOnly</b></td>
+        <td>boolean</td>
+        <td>
+          Determines if this topic should only be deleted after each experiement. For removeOnly topics the name can be a RegEx describing the topic.<br/>
+          <br/>
+            <i>Default</i>: false<br/>
+        </td>
+        <td>false</td>
+      </tr><tr>
+        <td><b>replicationFactor</b></td>
+        <td>integer</td>
+        <td>
+          The replication factor of the topic.<br/>
+          <br/>
+            <i>Default</i>: 0<br/>
+        </td>
+        <td>false</td>
+      </tr></tbody>
+</table>
+
+
 ### benchmark.status
 <sup><sup>[↩ Parent](#benchmark)</sup></sup>
 
diff --git a/docs/creating-a-benchmark.md b/docs/creating-a-benchmark.md
index 2b1d93bbec0afb3a8897a77439388538fc75c4a6..fde8ba0759407ddea8befc18e244784a9ba34c1f 100644
--- a/docs/creating-a-benchmark.md
+++ b/docs/creating-a-benchmark.md
@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers
 
 ## Kafka Configuration
 
-Theodolite allows to automatically create and remove Kafka topics for each SLO experiment.
-Use the `removeOnly: True` property for topics which are created automatically by the SUT.
-For those topics, also wildcards are allowed in the topic name.
+Theodolite allows to automatically create and remove Kafka topics for each SLO experiment by setting a `kafkaConfig`.
+It `bootstrapServer` needs to point your Kafka cluster and `topics` configures the list of Kafka topics to be created/removed.
+For each topic, you configure its name, the number of partitions and the replication factor.
 
-If no Kafka topics should be created, simply set:
-
-```yaml
-kafkaConfig: []
-```
+With the `removeOnly: True` property, you can also instruct Theodolite to only remove topics and not create them.
+This is useful when benchmarking SUTs, which create topics on their own (e.g., Kafka Streams and Samza applications).
+For those topics, also wildcards are allowed in the topic name and, of course, no partition count or replication factor must be provided.
 
 
 <!-- Further information: API Reference -->
diff --git a/docs/index.yaml b/docs/index.yaml
index 185ff1b0616b760c647a809006c48bf26c554490..509844ab0bc371d29302f90f69e769cd52a8e11b 100644
--- a/docs/index.yaml
+++ b/docs/index.yaml
@@ -1,6 +1,76 @@
 apiVersion: v1
 entries:
   theodolite:
+  - apiVersion: v2
+    appVersion: 0.6.3
+    created: "2022-01-24T13:40:40.07330713+01:00"
+    dependencies:
+    - condition: grafana.enabled
+      name: grafana
+      repository: https://grafana.github.io/helm-charts
+      version: 6.17.5
+    - condition: kube-prometheus-stack.enabled
+      name: kube-prometheus-stack
+      repository: https://prometheus-community.github.io/helm-charts
+      version: 20.0.1
+    - condition: cp-helm-charts.enabled
+      name: cp-helm-charts
+      repository: https://soerenhenning.github.io/cp-helm-charts
+      version: 0.6.0
+    - condition: kafka-lag-exporter.enabled
+      name: kafka-lag-exporter
+      repository: https://lightbend.github.io/kafka-lag-exporter/repo/
+      version: 0.6.7
+    description: Theodolite is a framework for benchmarking the horizontal and vertical
+      scalability of cloud-native applications.
+    digest: ebf08e3bf084fcd96eb2ee0588d495258d1741c74019257e55ba40f574874525
+    home: https://www.theodolite.rocks
+    maintainers:
+    - email: soeren.henning@email.uni-kiel.de
+      name: Sören Henning
+      url: https://www.se.informatik.uni-kiel.de/en/team/soeren-henning-m-sc
+    name: theodolite
+    sources:
+    - https://github.com/cau-se/theodolite
+    type: application
+    urls:
+    - https://github.com/cau-se/theodolite/releases/download/v0.6.3/theodolite-0.6.3.tgz
+    version: 0.6.3
+  - apiVersion: v2
+    appVersion: 0.6.2
+    created: "2022-01-23T22:31:04.773793557+01:00"
+    dependencies:
+    - condition: grafana.enabled
+      name: grafana
+      repository: https://grafana.github.io/helm-charts
+      version: 6.17.5
+    - condition: kube-prometheus-stack.enabled
+      name: kube-prometheus-stack
+      repository: https://prometheus-community.github.io/helm-charts
+      version: 20.0.1
+    - condition: cp-helm-charts.enabled
+      name: cp-helm-charts
+      repository: https://soerenhenning.github.io/cp-helm-charts
+      version: 0.6.0
+    - condition: kafka-lag-exporter.enabled
+      name: kafka-lag-exporter
+      repository: https://lightbend.github.io/kafka-lag-exporter/repo/
+      version: 0.6.7
+    description: Theodolite is a framework for benchmarking the horizontal and vertical
+      scalability of cloud-native applications.
+    digest: f6514038741051230dc9be0a6bde3fbc6f92136ecb36c276343e98e550f2c6d0
+    home: https://www.theodolite.rocks
+    maintainers:
+    - email: soeren.henning@email.uni-kiel.de
+      name: Sören Henning
+      url: https://www.se.informatik.uni-kiel.de/en/team/soeren-henning-m-sc
+    name: theodolite
+    sources:
+    - https://github.com/cau-se/theodolite
+    type: application
+    urls:
+    - https://github.com/cau-se/theodolite/releases/download/v0.6.2/theodolite-0.6.2.tgz
+    version: 0.6.2
   - apiVersion: v2
     appVersion: 0.6.1
     created: "2022-01-18T10:40:00.557347616+01:00"
@@ -176,4 +246,4 @@ entries:
     urls:
     - https://github.com/cau-se/theodolite/releases/download/v0.4.0/theodolite-0.4.0.tgz
     version: 0.4.0
-generated: "2022-01-18T10:40:00.486387187+01:00"
+generated: "2022-01-24T13:40:40.036786105+01:00"
diff --git a/execution/theodolite.yaml b/execution/theodolite.yaml
index ae18a68ee61c71e20008a71537357cdf9521216a..495b98f8dfff7fb5ddfe95d71d09fc1dfff67e0e 100644
--- a/execution/theodolite.yaml
+++ b/execution/theodolite.yaml
@@ -21,17 +21,16 @@ spec:
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.namespace
-
             # - name: MODE
             #   value: yaml-executor # Default is `yaml-executor`
             - name: THEODOLITE_EXECUTION
-              value: "execution/execution.yaml" # The name of this file must correspond to the filename of the execution, from which the config map is created.
+              value: "/deployments/execution/execution.yaml" # The name of this file must correspond to the filename of the execution, from which the config map is created.
             - name: THEODOLITE_BENCHMARK
-              value: "benchmark/benchmark.yaml" # The name of this file must correspond to the filename of the benchmark, from which the config map is created.
+              value: "/deployments/benchmark/benchmark.yaml" # The name of this file must correspond to the filename of the benchmark, from which the config map is created.
             - name: THEODOLITE_APP_RESOURCES
-              value: "benchmark-resources"
+              value: "/deployments/benchmark-resources"
             - name: RESULTS_FOLDER # Folder for saving results
-              value: results # Default is the pwd (/deployments)
+              value: /deployments/results # Default is the pwd (/deployments)
             # - name: CREATE_RESULTS_FOLDER # Specify whether the specified result folder should be created if it does not exist.
             #   value: "false" # Default is false.
           volumeMounts:
diff --git a/helm/templates/prometheus/datasource-config-map.yaml b/helm/templates/grafana/datasource-config-map.yaml
similarity index 100%
rename from helm/templates/prometheus/datasource-config-map.yaml
rename to helm/templates/grafana/datasource-config-map.yaml
diff --git a/helm/templates/prometheus/prometheus.yaml b/helm/templates/prometheus/prometheus.yaml
index 4e297b20290be9686b901fa8c76823136c6fabef..23a015250e19cc14550ce73e8162ba27f65be774 100644
--- a/helm/templates/prometheus/prometheus.yaml
+++ b/helm/templates/prometheus/prometheus.yaml
@@ -5,10 +5,7 @@ metadata:
   name: {{ template "theodolite.fullname" . }}-prometheus
 spec:
   serviceAccountName: {{ template "theodolite.fullname" . }}-prometheus
-  serviceMonitorSelector:
-    matchLabels:
-      #app: cp-kafka
-      appScope: titan-ccp
+  serviceMonitorSelector: {}
   resources:
     requests:
       memory: 400Mi
diff --git a/helm/templates/theodolite/theodolite-operator.yaml b/helm/templates/theodolite/theodolite-operator.yaml
index ff9c7e4de87c703af3350f7d9c797a5a53e2e675..f2669686eada049d33c5c88169d8d2ec3af84261 100644
--- a/helm/templates/theodolite/theodolite-operator.yaml
+++ b/helm/templates/theodolite/theodolite-operator.yaml
@@ -27,11 +27,18 @@ spec:
             - name: MODE
               value: operator
             - name: RESULTS_FOLDER
-              value: "./results"
+              value: "/deployments/results"
           volumeMounts:
             - name: theodolite-results-volume
               mountPath: "/deployments/results"
-        {{- if .Values.operator.sloChecker.droppedRecordsKStreams.enabled }}
+          resources:
+            requests:
+              memory: "512Mi"
+              cpu: "250m"
+            limits:
+              memory: "1024Mi"
+              cpu: "500m"
+        {{- if .Values.operator.sloChecker.generic.enabled }}
         - name: slo-checker-generic
           image: "{{ .Values.operator.sloChecker.generic.image }}:{{ .Values.operator.sloChecker.generic.imageTag }}"
           imagePullPolicy: "{{ .Values.operator.sloChecker.generic.imagePullPolicy }}"
@@ -43,6 +50,13 @@ spec:
             value: "8082"
           - name: LOG_LEVEL
             value: INFO
+          resources:
+            requests:
+              memory: "64Mi"
+              cpu: "50m"
+            limits:
+              memory: "128Mi"
+              cpu: "100m"
         {{- end }}
         {{- if .Values.operator.sloChecker.lagTrend.enabled }}
         - name: lag-trend-slo-checker
@@ -54,6 +68,13 @@ spec:
           env:
           - name: LOG_LEVEL
             value: INFO
+          resources:
+            requests:
+              memory: "64Mi"
+              cpu: "50m"
+            limits:
+              memory: "128Mi"
+              cpu: "100m"
         {{- end }}
         {{- if .Values.operator.sloChecker.droppedRecordsKStreams.enabled }}
         - name: slo-checker-dropped-records-kstreams
@@ -67,6 +88,13 @@ spec:
             value: "8081"
           - name: LOG_LEVEL
             value: INFO
+          resources:
+            requests:
+              memory: "64Mi"
+              cpu: "50m"
+            limits:
+              memory: "128Mi"
+              cpu: "100m"
         {{- end }}
         {{- if .Values.operator.resultsVolume.accessSidecar.enabled }}
         - name: results-access
diff --git a/slo-checker/dropped-records/Dockerfile b/slo-checker/dropped-records/Dockerfile
index 032b8153a6989ca04631ba553289dacb3620a38d..2cbc89a150217f15b3c4ba921050db720a34bf50 100644
--- a/slo-checker/dropped-records/Dockerfile
+++ b/slo-checker/dropped-records/Dockerfile
@@ -1,6 +1,15 @@
-FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
+FROM python:3.8
 
-COPY requirements.txt requirements.txt
-RUN pip install -r requirements.txt
+WORKDIR /code
 
-COPY ./app /app
\ No newline at end of file
+COPY ./requirements.txt /code/requirements.txt
+RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
+
+COPY ./app /code/app
+
+WORKDIR /code/app
+
+ENV HOST 0.0.0.0
+ENV PORT 80
+
+CMD ["sh", "-c", "uvicorn main:app --host $HOST --port $PORT"]
diff --git a/slo-checker/dropped-records/requirements.txt b/slo-checker/dropped-records/requirements.txt
index 8b6c3863226c2bd5e8bcd7982b2674dee593f192..a3d5ff675d6a89b2514f1936b1a8104d13ad9b55 100644
--- a/slo-checker/dropped-records/requirements.txt
+++ b/slo-checker/dropped-records/requirements.txt
@@ -1,5 +1,6 @@
-fastapi==0.65.2
-scikit-learn==0.20.3
-pandas==1.0.3
-uvicorn
 requests
+fastapi>=0.68.0,<0.69.0
+uvicorn>=0.15.0,<0.16.0
+#pydantic>=1.8.0,<2.0.0
+#scikit-learn==0.22.2
+pandas==1.0.3
diff --git a/slo-checker/generic/Dockerfile b/slo-checker/generic/Dockerfile
index 032b8153a6989ca04631ba553289dacb3620a38d..2cbc89a150217f15b3c4ba921050db720a34bf50 100644
--- a/slo-checker/generic/Dockerfile
+++ b/slo-checker/generic/Dockerfile
@@ -1,6 +1,15 @@
-FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
+FROM python:3.8
 
-COPY requirements.txt requirements.txt
-RUN pip install -r requirements.txt
+WORKDIR /code
 
-COPY ./app /app
\ No newline at end of file
+COPY ./requirements.txt /code/requirements.txt
+RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
+
+COPY ./app /code/app
+
+WORKDIR /code/app
+
+ENV HOST 0.0.0.0
+ENV PORT 80
+
+CMD ["sh", "-c", "uvicorn main:app --host $HOST --port $PORT"]
diff --git a/slo-checker/generic/requirements.txt b/slo-checker/generic/requirements.txt
index 87972ab01a276cbb63033e214e1ad53d38b5c8d8..a3d5ff675d6a89b2514f1936b1a8104d13ad9b55 100644
--- a/slo-checker/generic/requirements.txt
+++ b/slo-checker/generic/requirements.txt
@@ -1,4 +1,6 @@
-fastapi==0.65.2
-pandas==1.0.3
-uvicorn
 requests
+fastapi>=0.68.0,<0.69.0
+uvicorn>=0.15.0,<0.16.0
+#pydantic>=1.8.0,<2.0.0
+#scikit-learn==0.22.2
+pandas==1.0.3
diff --git a/slo-checker/record-lag/Dockerfile b/slo-checker/record-lag/Dockerfile
index 032b8153a6989ca04631ba553289dacb3620a38d..2cbc89a150217f15b3c4ba921050db720a34bf50 100644
--- a/slo-checker/record-lag/Dockerfile
+++ b/slo-checker/record-lag/Dockerfile
@@ -1,6 +1,15 @@
-FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
+FROM python:3.8
 
-COPY requirements.txt requirements.txt
-RUN pip install -r requirements.txt
+WORKDIR /code
 
-COPY ./app /app
\ No newline at end of file
+COPY ./requirements.txt /code/requirements.txt
+RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
+
+COPY ./app /code/app
+
+WORKDIR /code/app
+
+ENV HOST 0.0.0.0
+ENV PORT 80
+
+CMD ["sh", "-c", "uvicorn main:app --host $HOST --port $PORT"]
diff --git a/slo-checker/record-lag/requirements.txt b/slo-checker/record-lag/requirements.txt
index 8b6c3863226c2bd5e8bcd7982b2674dee593f192..770498e91e3f705e98868d009518b355a19a356a 100644
--- a/slo-checker/record-lag/requirements.txt
+++ b/slo-checker/record-lag/requirements.txt
@@ -1,5 +1,6 @@
-fastapi==0.65.2
-scikit-learn==0.20.3
-pandas==1.0.3
-uvicorn
 requests
+fastapi>=0.68.0,<0.69.0
+uvicorn>=0.15.0,<0.16.0
+#pydantic>=1.8.0,<2.0.0
+scikit-learn==0.22.2
+pandas==1.0.3
diff --git a/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs
new file mode 100644
index 0000000000000000000000000000000000000000..60b9977149c7b281cb2ac91ee282f73d4351e348
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -0,0 +1,127 @@
+cleanup.add_default_serial_version_id=true
+cleanup.add_generated_serial_version_id=false
+cleanup.add_missing_annotations=true
+cleanup.add_missing_deprecated_annotations=true
+cleanup.add_missing_methods=false
+cleanup.add_missing_nls_tags=false
+cleanup.add_missing_override_annotations=true
+cleanup.add_missing_override_annotations_interface_methods=true
+cleanup.add_serial_version_id=false
+cleanup.always_use_blocks=true
+cleanup.always_use_parentheses_in_expressions=false
+cleanup.always_use_this_for_non_static_field_access=true
+cleanup.always_use_this_for_non_static_method_access=true
+cleanup.convert_functional_interfaces=false
+cleanup.convert_to_enhanced_for_loop=true
+cleanup.correct_indentation=true
+cleanup.format_source_code=true
+cleanup.format_source_code_changes_only=false
+cleanup.insert_inferred_type_arguments=false
+cleanup.make_local_variable_final=true
+cleanup.make_parameters_final=true
+cleanup.make_private_fields_final=true
+cleanup.make_type_abstract_if_missing_method=false
+cleanup.make_variable_declarations_final=true
+cleanup.never_use_blocks=false
+cleanup.never_use_parentheses_in_expressions=true
+cleanup.organize_imports=true
+cleanup.qualify_static_field_accesses_with_declaring_class=false
+cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+cleanup.qualify_static_member_accesses_with_declaring_class=true
+cleanup.qualify_static_method_accesses_with_declaring_class=false
+cleanup.remove_private_constructors=true
+cleanup.remove_redundant_modifiers=false
+cleanup.remove_redundant_semicolons=true
+cleanup.remove_redundant_type_arguments=true
+cleanup.remove_trailing_whitespaces=true
+cleanup.remove_trailing_whitespaces_all=true
+cleanup.remove_trailing_whitespaces_ignore_empty=false
+cleanup.remove_unnecessary_casts=true
+cleanup.remove_unnecessary_nls_tags=true
+cleanup.remove_unused_imports=true
+cleanup.remove_unused_local_variables=false
+cleanup.remove_unused_private_fields=true
+cleanup.remove_unused_private_members=false
+cleanup.remove_unused_private_methods=true
+cleanup.remove_unused_private_types=true
+cleanup.sort_members=false
+cleanup.sort_members_all=false
+cleanup.use_anonymous_class_creation=false
+cleanup.use_blocks=true
+cleanup.use_blocks_only_for_return_and_throw=false
+cleanup.use_lambda=true
+cleanup.use_parentheses_in_expressions=true
+cleanup.use_this_for_non_static_field_access=true
+cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+cleanup.use_this_for_non_static_method_access=true
+cleanup.use_this_for_non_static_method_access_only_if_necessary=false
+cleanup_profile=_CAU-SE-Style
+cleanup_settings_version=2
+eclipse.preferences.version=1
+editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
+formatter_profile=_CAU-SE-Style
+formatter_settings_version=21
+org.eclipse.jdt.ui.ignorelowercasenames=true
+org.eclipse.jdt.ui.importorder=;
+org.eclipse.jdt.ui.ondemandthreshold=99
+org.eclipse.jdt.ui.staticondemandthreshold=99
+sp_cleanup.add_default_serial_version_id=true
+sp_cleanup.add_generated_serial_version_id=false
+sp_cleanup.add_missing_annotations=true
+sp_cleanup.add_missing_deprecated_annotations=true
+sp_cleanup.add_missing_methods=false
+sp_cleanup.add_missing_nls_tags=false
+sp_cleanup.add_missing_override_annotations=true
+sp_cleanup.add_missing_override_annotations_interface_methods=true
+sp_cleanup.add_serial_version_id=false
+sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_parentheses_in_expressions=false
+sp_cleanup.always_use_this_for_non_static_field_access=true
+sp_cleanup.always_use_this_for_non_static_method_access=true
+sp_cleanup.convert_functional_interfaces=false
+sp_cleanup.convert_to_enhanced_for_loop=true
+sp_cleanup.correct_indentation=true
+sp_cleanup.format_source_code=true
+sp_cleanup.format_source_code_changes_only=false
+sp_cleanup.insert_inferred_type_arguments=false
+sp_cleanup.make_local_variable_final=true
+sp_cleanup.make_parameters_final=true
+sp_cleanup.make_private_fields_final=true
+sp_cleanup.make_type_abstract_if_missing_method=false
+sp_cleanup.make_variable_declarations_final=true
+sp_cleanup.never_use_blocks=false
+sp_cleanup.never_use_parentheses_in_expressions=true
+sp_cleanup.on_save_use_additional_actions=true
+sp_cleanup.organize_imports=true
+sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
+sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
+sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
+sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
+sp_cleanup.remove_private_constructors=true
+sp_cleanup.remove_redundant_modifiers=false
+sp_cleanup.remove_redundant_semicolons=true
+sp_cleanup.remove_redundant_type_arguments=true
+sp_cleanup.remove_trailing_whitespaces=true
+sp_cleanup.remove_trailing_whitespaces_all=true
+sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
+sp_cleanup.remove_unnecessary_casts=true
+sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unused_imports=true
+sp_cleanup.remove_unused_local_variables=false
+sp_cleanup.remove_unused_private_fields=true
+sp_cleanup.remove_unused_private_members=false
+sp_cleanup.remove_unused_private_methods=true
+sp_cleanup.remove_unused_private_types=true
+sp_cleanup.sort_members=false
+sp_cleanup.sort_members_all=false
+sp_cleanup.use_anonymous_class_creation=false
+sp_cleanup.use_blocks=true
+sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_lambda=true
+sp_cleanup.use_parentheses_in_expressions=true
+sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
+sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
\ No newline at end of file
diff --git a/theodolite-benchmarks/beam-commons/build.gradle b/theodolite-benchmarks/beam-commons/build.gradle
index 53481485f5a2cec428e1045e4e3f429a1ad26196..64ac2bb51ae1e6d741749a81e5c6c9e296d14d68 100644
--- a/theodolite-benchmarks/beam-commons/build.gradle
+++ b/theodolite-benchmarks/beam-commons/build.gradle
@@ -13,21 +13,19 @@ repositories {
 }
 
 dependencies {
-  // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
   implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
   implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
-  implementation 'com.google.code.gson:gson:2.8.2'
-  implementation 'com.google.guava:guava:24.1-jre'
 
+  implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.35.0'
   implementation('org.apache.beam:beam-sdks-java-io-kafka:2.35.0'){
     exclude group: 'org.apache.kafka', module: 'kafka-clients'
   }
+  implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') 
+  
   implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
-  implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.35.0'
 
   runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
   runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
 
-  // Use JUnit test framework
   testImplementation 'junit:junit:4.12'
 }
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
index c936ce918c10f3c500cdd26f7e057cd7b6c555b6..e67c5f60b6401b4ecd1f42b2a184afbc4654f425 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java
@@ -12,6 +12,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
  */
 public class AbstractPipeline extends Pipeline {
 
+  private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader";
+  private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url";
   protected final String inputTopic;
   protected final String bootstrapServer;
   // Application Configurations
@@ -21,8 +23,8 @@ public class AbstractPipeline extends Pipeline {
     super(options);
     this.config = config;
 
-    inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
-    bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
+    this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
+    this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
   }
 
   /**
@@ -32,19 +34,37 @@ public class AbstractPipeline extends Pipeline {
    */
   public Map<String, Object> buildConsumerConfig() {
     final Map<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-        config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        config
-            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
-    consumerConfig.put("schema.registry.url",
-        config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
-
-    consumerConfig.put("specific.avro.reader",
-        config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
-
-    final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName);
+    consumerConfig.put(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
+    consumerConfig.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+    consumerConfig.put(
+        KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
+        this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
+    consumerConfig.put(
+        KAFKA_CONFIG_SPECIFIC_AVRO_READER,
+        this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
+    consumerConfig.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        this.config.getString(ConfigurationKeys.APPLICATION_NAME));
     return consumerConfig;
   }
+
+  /**
+   * Builds a simple configuration for a Kafka producer transformation.
+   *
+   * @return the build configuration.
+   */
+  public Map<String, Object> buildProducerConfig() {
+    final Map<String, Object> config = new HashMap<>();
+    config.put(
+        KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
+        this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
+    config.put(
+        KAFKA_CONFIG_SPECIFIC_AVRO_READER,
+        this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
+    return config;
+  }
 }
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..c53dde3d5f4b7d18822c916a637c356b898fe2cd
--- /dev/null
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/ActivePowerRecordDeserializer.java
@@ -0,0 +1,11 @@
+package theodolite.commons.beam.kafka;
+
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}.
+ */
+public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> {
+}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
deleted file mode 100644
index f102bee41d66c251ecb66418dd3b90dced32cffb..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package theodolite.commons.beam.kafka;
-
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.Map;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
- */
-public class KafkaActivePowerRecordReader extends
-    PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
-
-  private static final long serialVersionUID = 2603286150183186115L;
-  private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
-
-
-  /**
-   * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
-   */
-  public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
-                                      final Map<String, Object> consumerConfig) {
-    super();
-
-    if (bootstrapServer == null) {
-      throw new IllegalArgumentException("bootstrapServer is null");
-    }
-
-    if (inputTopic == null) {
-      throw new IllegalArgumentException("inputTopic is null");
-    }
-
-    // Check if boostrap server and inputTopic are defined
-    if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
-      throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
-    }
-
-
-    reader =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            .withoutMetadata();
-  }
-
-  @Override
-  public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
-    return input.apply(this.reader);
-  }
-
-}
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
index 732afe9a0c1d4bdfea876025fceea0c5da1310fe..7a48bd71d497f65351888425d092decf5adb05f3 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java
@@ -1,6 +1,5 @@
 package theodolite.commons.beam.kafka;
 
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -12,40 +11,37 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
- * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
- * Has additional a TimestampPolicy.
+ * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
  */
-public class KafkaActivePowerTimestampReader extends
-    PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
+public class KafkaActivePowerTimestampReader
+    extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
 
   private static final long serialVersionUID = 2603286150183186115L;
   private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
 
-
   /**
    * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
    */
-  public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic,
-                                         final Map<String, Object> consumerConfig) {
+  public KafkaActivePowerTimestampReader(
+      final String bootstrapServer,
+      final String inputTopic,
+      final Map<String, Object> consumerConfig) {
     super();
 
-    // Check if boostrap server and inputTopic are defined
+    // Check if bootstrap server and inputTopic are defined
     if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
       throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
     }
 
-    reader =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                AvroCoder.of(ActivePowerRecord.class))
-            .withConsumerConfigUpdates(consumerConfig)
-            // Set TimeStampPolicy for event time
-            .withTimestampPolicyFactory(
-                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
-            .withoutMetadata();
+    this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
+        .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
+        .withValueDeserializerAndCoder(
+            ActivePowerRecordDeserializer.class,
+            AvroCoder.of(ActivePowerRecord.class))
+        .withConsumerConfigUpdates(consumerConfig)
+        .withTimestampPolicyFactory(
+            (tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
+        .withoutMetadata();
   }
 
   @Override
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
index 0a3867e71479e36ce30a9f222dfd0a7d473bd209..6d33f6f01493c10a1eb6aca56dd309ae58ce4b8d 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaWriterTransformation.java
@@ -1,5 +1,6 @@
 package theodolite.commons.beam.kafka;
 
+import java.util.Map;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 /**
- * Wrapper for a Kafka writing Transformation
- * where the value type can be generic.
+ * Wrapper for a Kafka writing Transformation where the value type can be generic.
+ *
  * @param <T> type of the value.
  */
-public class KafkaWriterTransformation<T> extends
-    PTransform<PCollection<KV<String, T>>, PDone> {
+public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
 
   private static final long serialVersionUID = 3171423303843174723L;
   private final PTransform<PCollection<KV<String, T>>, PDone> writer;
 
   /**
-   * Creates a new kafka writer transformation.
+   * Creates a new Kafka writer transformation.
    */
-  public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic,
-                                   final Class<? extends Serializer<T>> valueSerializer) {
+  public KafkaWriterTransformation(
+      final String bootstrapServer,
+      final String outputTopic,
+      final Class<? extends Serializer<T>> valueSerializer) {
+    this(bootstrapServer, outputTopic, valueSerializer, Map.of());
+  }
+
+  /**
+   * Creates a new Kafka writer transformation.
+   */
+  public KafkaWriterTransformation(
+      final String bootstrapServer,
+      final String outputTopic,
+      final Class<? extends Serializer<T>> valueSerializer,
+      final Map<String, Object> producerConfig) {
     super();
-    // Check if boostrap server and outputTopic are defined
+    // Check if bootstrap server and outputTopic are defined
     if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
       throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
     }
@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
         .withBootstrapServers(bootstrapServer)
         .withTopic(outputTopic)
         .withKeySerializer(StringSerializer.class)
-        .withValueSerializer(valueSerializer);
+        .withValueSerializer(valueSerializer)
+        .withProducerConfigUpdates(producerConfig);
 
   }
 
diff --git a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
index 66b402b58f39b79066638ce679c27c0378d5be54..174249a98f9d91ce2cbf2bb64b27c09b37f05d9f 100644
--- a/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/flink-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs
index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..713419c8d3d74d3bd7fd05c3e839367753fcdee0 100644
--- a/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/kstreams-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/load-generator-commons/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java
index 6d908d34b7c6b87254782b6ae8b0b8dc2a6d036e..0d331a900f5bd5c18dbeaf2fc2a249256151ce70 100644
--- a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java
+++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java
@@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.verify;
 import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import com.google.gson.Gson;
 import java.net.URI;
 import org.junit.Before;
 import org.junit.Rule;
@@ -21,8 +20,6 @@ public class HttpRecordSenderTest {
 
   private HttpRecordSender<ActivePowerRecord> httpRecordSender;
 
-  private Gson gson;
-
   @Rule
   public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
 
@@ -30,7 +27,6 @@ public class HttpRecordSenderTest {
   public void setup() {
     this.httpRecordSender =
         new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port()));
-    this.gson = new Gson();
   }
 
   @Test
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
index 79566fd937b9c100663d426610b6ff476035ef87..251523441e339cbaf58c7e3a1b30e97cc354df18 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/LogKeyValue.java
@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Logs all Key Value pairs.
  */
-@SuppressWarnings({"unused"})
 public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
   private static final long serialVersionUID = 4328743;
   private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
@@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
   @ProcessElement
   public void processElement(@Element final KV<String, String> kv,
       final OutputReceiver<KV<String, String>> out) {
-    if (LOGGER.isInfoEnabled()) {
-      LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
-    }
+    LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
     out.output(kv);
   }
 }
diff --git a/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc1-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc1-kstreams/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc1-load-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc2-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc2-kstreams/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc2-load-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc3-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc3-kstreams/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc3-load-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc4-beam/build.gradle b/theodolite-benchmarks/uc4-beam/build.gradle
index 502e94fa737fb2ae1bab861407b27575cd8766ca..3e9d917cc3586e5df2c5645f1d2cbcf03e3993e4 100644
--- a/theodolite-benchmarks/uc4-beam/build.gradle
+++ b/theodolite-benchmarks/uc4-beam/build.gradle
@@ -2,4 +2,6 @@ plugins {
   id 'theodolite.beam'
 }
 
-
+dependencies {
+  implementation ('io.confluent:kafka-streams-avro-serde:5.3.2') 
+}
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
index 7b66082c91b87c246d8c834249d2bc82545766f5..cf25f043ecef8c4bc564cff454d6477d69c945aa 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
@@ -16,11 +16,11 @@ import titan.ccp.model.records.ActivePowerRecord;
 /**
  * Duplicates the Kv containing the (Children,Parents) pair as a flat map.
  */
-public class DuplicateAsFlatMap extends DoFn
-    <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
+public class DuplicateAsFlatMap
+    extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
   private static final long serialVersionUID = -5132355515723961647L;
   @StateId("parents")
-  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD
+  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD
   private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
 
   public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
@@ -30,19 +30,20 @@ public class DuplicateAsFlatMap extends DoFn
 
 
   /**
-   *  Generate a KV-pair for every child-parent match.
+   * Generate a KV-pair for every child-parent match.
    */
   @ProcessElement
-  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
-                             final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
-                             @StateId("parents") final ValueState<Set<String>> state,
-                             final ProcessContext c) {
+  public void processElement(
+      @Element final KV<String, ActivePowerRecord> kv,
+      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+      @StateId("parents") final ValueState<Set<String>> state,
+      final ProcessContext c) {
 
     final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
     final Set<String> newParents =
-        c.sideInput(childParentPairMap).get(kv.getKey()) == null
+        c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
             ? Collections.emptySet()
-            : c.sideInput(childParentPairMap).get(kv.getKey());
+            : c.sideInput(this.childParentPairMap).get(kv.getKey());
     final Set<String> oldParents =
         MoreObjects.firstNonNull(state.read(), Collections.emptySet());
     // Forward new Pairs if they exist
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
index 7179fe5da937280d5baf72cd73cc392ef15a60e0..0c63e6f9322e5f70f1ad010de168f1a5292a45a4 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
@@ -66,8 +66,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
     final Duration gracePeriod =
         Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
 
-    // Build kafka configuration
-    final Map<String, Object> consumerConfig = this.buildConsumerConfig();
+    // Build Kafka configuration
+    final Map<String, Object> consumerConfig = super.buildConsumerConfig();
     final Map<String, Object> configurationConfig = this.configurationConfig(config);
 
     // Set Coders for Classes that will be distributed
@@ -77,25 +77,34 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
     // Read from Kafka
     // ActivePowerRecords
     final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader =
-        new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
+        new KafkaActivePowerTimestampReader(
+            this.bootstrapServer,
+            this.inputTopic,
+            consumerConfig);
 
     // Configuration Events
     final KafkaGenericReader<Event, String> kafkaConfigurationReader =
         new KafkaGenericReader<>(
-            this.bootstrapServer, configurationTopic, configurationConfig,
-            EventDeserializer.class, StringDeserializer.class);
-
-    // Transform into AggregatedActivePowerRecords into ActivePowerRecords
-    final AggregatedToActive aggregatedToActive = new AggregatedToActive();
+            this.bootstrapServer,
+            configurationTopic,
+            configurationConfig,
+            EventDeserializer.class,
+            StringDeserializer.class);
 
     // Write to Kafka
     final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput =
         new KafkaWriterTransformation<>(
-            this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class);
+            this.bootstrapServer,
+            outputTopic,
+            AggregatedActivePowerRecordSerializer.class,
+            super.buildProducerConfig());
 
     final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback =
         new KafkaWriterTransformation<>(
-            this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
+            this.bootstrapServer,
+            feedbackTopic,
+            AggregatedActivePowerRecordSerializer.class,
+            super.buildProducerConfig());
 
     // Apply pipeline transformations
     final PCollection<KV<String, ActivePowerRecord>> values = this
@@ -115,7 +124,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
             .withBootstrapServers(this.bootstrapServer)
             .withTopic(feedbackTopic)
             .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
+            .withValueDeserializerAndCoder(
+                AggregatedActivePowerRecordDeserializer.class,
+                AvroCoder.of(AggregatedActivePowerRecord.class))
+            .withConsumerConfigUpdates(consumerConfig)
             .withTimestampPolicyFactory(
                 (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
                     previousWaterMark))
@@ -123,11 +135,12 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
         .apply("Apply Windows", Window.into(FixedWindows.of(duration)))
         // Convert into the correct data format
         .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
-            MapElements.via(aggregatedToActive))
+            MapElements.via(new AggregatedToActive()))
         .apply("Set trigger for feedback", Window
             .<KV<String, ActivePowerRecord>>configure()
             .triggering(Repeatedly.forever(
-                AfterProcessingTime.pastFirstElementInPane()
+                AfterProcessingTime
+                    .pastFirstElementInPane()
                     .plusDelayOf(triggerDelay)))
             .withAllowedLateness(gracePeriod)
             .discardingFiredPanes());
@@ -170,17 +183,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
                 .accumulatingFiredPanes())
             .apply(View.asMap());
 
-    final FilterNullValues filterNullValues = new FilterNullValues();
-
     // Build pairs of every sensor reading and parent
     final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
         inputCollection.apply(
             "Duplicate as flatMap",
-            ParDo.of(new DuplicateAsFlatMap(childParentPairMap))
-                .withSideInputs(childParentPairMap))
+            ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
             .apply("Filter only latest changes", Latest.perKey())
-            .apply("Filter out null values",
-                Filter.by(filterNullValues));
+            .apply("Filter out null values", Filter.by(new FilterNullValues()));
 
     final SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
     final SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
@@ -204,8 +213,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
 
     aggregations.apply("Write to aggregation results", kafkaOutput);
 
-    aggregations
-        .apply("Write to feedback topic", kafkaFeedback);
+    aggregations.apply("Write to feedback topic", kafkaFeedback);
 
   }
 
@@ -217,14 +225,15 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
    */
   public Map<String, Object> configurationConfig(final Configuration config) {
     final Map<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+    consumerConfig.put(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        config
-            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
-
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
-        .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
+    consumerConfig.put(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+    consumerConfig.put(
+        ConsumerConfig.GROUP_ID_CONFIG, config
+            .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
     return consumerConfig;
   }
 
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
index 1d539c0d0d777c47c95e2b594e5e952fb4ab6c0a..3e0be0fa456efa3ec67504ea9d0e285ae8b3b913 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
@@ -11,8 +11,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
- * Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a
- * org.apache.beam.sdk.coders.Coder.
+ * {@link Coder} for an {@link AggregatedActivePowerRecord}.
  */
 @SuppressWarnings("serial")
 public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
@@ -51,7 +50,7 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     if (!DETERMINISTIC) {
-      throw new NonDeterministicException(this, "This class should be deterministic!");
+      throw new NonDeterministicException(this, "This class should be deterministic.");
     }
   }
 }
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
index 6e2f2765ff65d3bca2a127be36db0854f15afebc..3076861a53dac031afd9e8eb913b5a0bafe480c0 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
@@ -1,34 +1,12 @@
 package serialization;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import org.apache.beam.sdk.coders.AvroCoder;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
- * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer
+ * {@link Deserializer} for an {@link AggregatedActivePowerRecord}.
  */
 public class AggregatedActivePowerRecordDeserializer
-    implements Deserializer<AggregatedActivePowerRecord> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
-
-  private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
-      AvroCoder.of(AggregatedActivePowerRecord.class);
-
-  @Override
-  public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
-    AggregatedActivePowerRecord value = null;
-    try {
-      value = this.avroEnCoder.decode(new ByteArrayInputStream(data));
-    } catch (final IOException e) {
-      LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e);
-    }
-    return value;
-  }
-
+    extends SpecificAvroDeserializer<AggregatedActivePowerRecord> {
 }
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
index 77b79d5465f1d561870bf5b04f8fa20f87076adb..26801d8a28b9756214c65c4e8190e15d04bb3e68 100644
--- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
@@ -1,45 +1,12 @@
 package serialization;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.beam.sdk.coders.AvroCoder;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
 
 /**
- * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer
+ * {@link Serializer} for an {@link AggregatedActivePowerRecord}.
  */
 public class AggregatedActivePowerRecordSerializer
-    implements Serializer<AggregatedActivePowerRecord> {
-
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
-
-  private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
-      AvroCoder.of(AggregatedActivePowerRecord.class);
-
-  @Override
-  public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
-    final ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try {
-      this.avroEnCoder.encode(data, out);
-    } catch (final IOException e) {
-      LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
-    }
-    final byte[] result = out.toByteArray();
-    try {
-      out.close();
-    } catch (final IOException e) {
-      LOGGER.error(
-          "Could not close output stream after serialization of AggregatedActivePowerRecord", e);
-    }
-    return result;
-  }
-
-  @Override
-  public void close() {
-    Serializer.super.close();
-  }
+    extends SpecificAvroSerializer<AggregatedActivePowerRecord> {
 }
diff --git a/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs
index 272e01533f6a345d53d2635c47e38c6d3c33dc8a..08fcb07933ca19165976bffd5e7fdfdaf64ee1d2 100644
--- a/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc4-flink/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs
index fa98ca63d77bdee891150bd6713f70197a75cefc..a375cb792eeb842ecfd1f789fbf6a716df43e9c8 100644
--- a/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc4-kstreams/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs b/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs
index 4d01df75552c562406705858b6368ecf59d6e82f..ac23341bf71ac68df4183361493261758fd5dafb 100644
--- a/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs
+++ b/theodolite-benchmarks/uc4-load-generator/.settings/org.eclipse.jdt.ui.prefs
@@ -61,7 +61,7 @@ cleanup_settings_version=2
 eclipse.preferences.version=1
 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
 formatter_profile=_CAU-SE-Style
-formatter_settings_version=15
+formatter_settings_version=21
 org.eclipse.jdt.ui.ignorelowercasenames=true
 org.eclipse.jdt.ui.importorder=;
 org.eclipse.jdt.ui.ondemandthreshold=99
diff --git a/theodolite/README.md b/theodolite/README.md
index 96f56c20db1d0796ba692cc497b93532517526ff..f662329f7eda3a39632581b7125a2f2f2feced8a 100644
--- a/theodolite/README.md
+++ b/theodolite/README.md
@@ -1,12 +1,12 @@
-# Theodolite project
+# Theodolite
 
 This project uses Quarkus, the Supersonic Subatomic Java Framework.
 
-If you want to learn more about Quarkus, please visit its website: <https://quarkus.io/> .
+If you want to learn more about Quarkus, please visit its website: https://quarkus.io/.
 
 ## Running the application in dev mode
 
-You can run your application in dev mode using:
+You can run your application in dev mode that enables live coding using:
 
 ```sh
 ./gradlew quarkusDev
@@ -23,8 +23,10 @@ The application can be packaged using:
 ./gradlew build
 ```
 
-It produces the `theodolite-0.7.0-SNAPSHOT-runner.jar` file in the `/build` directory. Be aware that it’s not
-an _über-jar_ as the dependencies are copied into the `build/lib` directory.
+It produces the `quarkus-run.jar` file in the `build/quarkus-app/` directory.
+Be aware that it’s not an _über-jar_ as the dependencies are copied into the `build/quarkus-app/lib/` directory.
+
+The application is now runnable using `java -jar build/quarkus-app/quarkus-run.jar`.
 
 If you want to build an _über-jar_, execute the following command:
 
@@ -32,12 +34,10 @@ If you want to build an _über-jar_, execute the following command:
 ./gradlew build -Dquarkus.package.type=uber-jar
 ```
 
-The application is now runnable using `java -jar build/theodolite-0.7.0-SNAPSHOT-runner.jar`.
+The application, packaged as an _über-jar_, is now runnable using `java -jar build/*-runner.jar`.
 
 ## Creating a native executable
 
-It is recommended to use the native GraalVM images to create executable jars from Theodolite. For more information please visit the [Native Image guide](https://www.graalvm.org/reference-manual/native-image/).
-
 You can create a native executable using:
 
 ```sh
@@ -55,15 +55,21 @@ You can then execute your native executable with:
 
 If you want to learn more about building native executables, please consult https://quarkus.io/guides/gradle-tooling.
 
-## Build docker images
+## Building container images
 
-For the jvm version use:
+For the JVM version use:
 
 ```sh
 ./gradlew build
 docker build -f src/main/docker/Dockerfile.jvm -t theodolite-jvm .
 ```
 
+Alternatively, you can also use Kaniko to build the image:
+
+```sh
+docker run -it --rm --name kaniko -v "`pwd`":/theodolite --entrypoint "" gcr.io/kaniko-project/executor:debug /kaniko/executor --context /theodolite --dockerfile src/main/docker/Dockerfile.jvm --no-push
+```
+
 For the native image version use:
 
 ```sh
@@ -71,7 +77,7 @@ For the native image version use:
 docker build -f src/main/docker/Dockerfile.native -t theodolite-native .
 ```
 
-## Execute docker images
+## Run a container
 
 Remember to set the environment variables first.
 
diff --git a/theodolite/build.gradle b/theodolite/build.gradle
index 0ec113cbba893bc1f2f44a60c270f7cb67688803..a066e94f09b71720f9392947640b077b153ccb9c 100644
--- a/theodolite/build.gradle
+++ b/theodolite/build.gradle
@@ -1,14 +1,14 @@
 plugins {
-    id 'org.jetbrains.kotlin.jvm' version "1.5.31"
-    id "org.jetbrains.kotlin.plugin.allopen" version "1.5.31"
+    id 'org.jetbrains.kotlin.jvm' version "1.6.10"
+    id "org.jetbrains.kotlin.plugin.allopen" version "1.6.10"
     id 'io.quarkus'
     id "io.gitlab.arturbosch.detekt" version "1.15.0"
     id "org.jlleitschuh.gradle.ktlint" version "10.0.0"
 }
 
 repositories {
-    mavenLocal()
     mavenCentral()
+    mavenLocal()
     jcenter()
 }
 
diff --git a/theodolite/build_jvm.sh b/theodolite/build_jvm.sh
deleted file mode 100755
index f4dd32fc5228576f09e95f0e8ac06fa08ea6acc7..0000000000000000000000000000000000000000
--- a/theodolite/build_jvm.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-
-./gradlew build -x test
-
-docker build -f src/main/docker/Dockerfile.jvm -t quarkus/theodolite-jvm .
-
-docker run -i --rm -p 8080:8080 quarkus/theodolite-jvm
diff --git a/theodolite/build_native.sh b/theodolite/build_native.sh
deleted file mode 100755
index c2d7d81f35a24af951005bb30c52a8ab494ddb64..0000000000000000000000000000000000000000
--- a/theodolite/build_native.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-
-./gradlew build -Dquarkus.package.type=native -x test
-
-docker build -f src/main/docker/Dockerfile.native -t quarkus/theodolite .
-
-docker run -i --rm -p 8080:8080 quarkus/theodolite
diff --git a/theodolite/crd/crd-benchmark.yaml b/theodolite/crd/crd-benchmark.yaml
index 55bf6ed69e44287905bce85b63f66bb43ea65669..c901e61360c05b2f1cf2b1767a20f624eb262231 100644
--- a/theodolite/crd/crd-benchmark.yaml
+++ b/theodolite/crd/crd-benchmark.yaml
@@ -20,7 +20,7 @@ spec:
         properties:
           spec:
             type: object
-            required: ["sut", "loadGenerator", "resourceTypes", "loadTypes", "kafkaConfig"]
+            required: ["sut", "loadGenerator", "resourceTypes", "loadTypes"]
             properties:
               name:
                 description: This field exists only for technical reasons and should not be set by the user. The value of the field will be overwritten.
diff --git a/theodolite/gradle.properties b/theodolite/gradle.properties
index 76ed8f2136f14263460bc391d420c78de200d659..fd5768bc24a65dbd43b3ea770c854ae7c0da0a91 100644
--- a/theodolite/gradle.properties
+++ b/theodolite/gradle.properties
@@ -1,8 +1,8 @@
 #Gradle properties
-quarkusPluginVersion=2.5.2.Final
-quarkusPlatformArtifactId=quarkus-bom
 quarkusPluginId=io.quarkus
+quarkusPluginVersion=2.6.3.Final
 quarkusPlatformGroupId=io.quarkus.platform
-quarkusPlatformVersion=2.5.2.Final
+quarkusPlatformArtifactId=quarkus-bom
+quarkusPlatformVersion=2.6.3.Final
 
 #org.gradle.logging.level=INFO
\ No newline at end of file
diff --git a/theodolite/src/main/docker/Dockerfile.jvm b/theodolite/src/main/docker/Dockerfile.jvm
index 03035752038fee2e5ce4c477c61adc84991f3729..e33d7c379a4336610c16d59b9d3315a1e8abad2b 100644
--- a/theodolite/src/main/docker/Dockerfile.jvm
+++ b/theodolite/src/main/docker/Dockerfile.jvm
@@ -18,38 +18,24 @@
 #
 # Then run the container using :
 #
-# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/theodolite-jvm
+# docker run -i --rm -p 8080:8080 quarkus/theodolite-jvm
 #
 ###
-FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 
+FROM registry.access.redhat.com/ubi8/openjdk-11-runtime:1.10
 
-ARG JAVA_PACKAGE=java-11-openjdk-headless
-ARG RUN_JAVA_VERSION=1.3.8
 ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
-# Install java and the run-java script
-# Also set up permissions for user `1001`
-RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
-    && microdnf update \
-    && microdnf clean all \
-    && mkdir /deployments \
-    && chown 1001 /deployments \
-    && chmod "g+rwX" /deployments \
-    && chown 1001:root /deployments \
-    && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \
-    && chown 1001 /deployments/run-java.sh \
-    && chmod 540 /deployments/run-java.sh \
-    && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/conf/security/java.security
 
 # Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size.
 ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
+
 # We make four distinct layers so if there are application changes the library layers can be re-used
-COPY --chown=1001 build/quarkus-app/lib/ /deployments/lib/
-COPY --chown=1001 build/quarkus-app/*.jar /deployments/
-COPY --chown=1001 build/quarkus-app/app/ /deployments/app/
-COPY --chown=1001 build/quarkus-app/quarkus/ /deployments/quarkus/
+COPY --chown=185 build/quarkus-app/lib/ /deployments/lib/
+COPY --chown=185 build/quarkus-app/*.jar /deployments/
+COPY --chown=185 build/quarkus-app/app/ /deployments/app/
+COPY --chown=185 build/quarkus-app/quarkus/ /deployments/quarkus/
 
 EXPOSE 8080
-USER 1001
+USER 185
 
-ENTRYPOINT [ "/deployments/run-java.sh" ]
+ENTRYPOINT [ "java", "-jar", "/deployments/quarkus-run.jar" ]
 
diff --git a/theodolite/src/main/docker/Dockerfile.legacy-jar b/theodolite/src/main/docker/Dockerfile.legacy-jar
index f9dffd188570c14087bafaec838b58b61a4e5912..aa5908c4ed42f005fa67c17fd2c3b3e00978228a 100644
--- a/theodolite/src/main/docker/Dockerfile.legacy-jar
+++ b/theodolite/src/main/docker/Dockerfile.legacy-jar
@@ -18,34 +18,20 @@
 #
 # Then run the container using :
 #
-# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/theodolite-legacy-jar
+# docker run -i --rm -p 8080:8080 quarkus/theodolite-legacy-jar
 #
 ###
-FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 
+FROM registry.access.redhat.com/ubi8/openjdk-11-runtime:1.10
 
-ARG JAVA_PACKAGE=java-11-openjdk-headless
-ARG RUN_JAVA_VERSION=1.3.8
 ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
-# Install java and the run-java script
-# Also set up permissions for user `1001`
-RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
-    && microdnf update \
-    && microdnf clean all \
-    && mkdir /deployments \
-    && chown 1001 /deployments \
-    && chmod "g+rwX" /deployments \
-    && chown 1001:root /deployments \
-    && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \
-    && chown 1001 /deployments/run-java.sh \
-    && chmod 540 /deployments/run-java.sh \
-    && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/conf/security/java.security
 
 # Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size.
 ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
+
 COPY build/lib/* /deployments/lib/
-COPY build/*-runner.jar /deployments/app.jar
+COPY build/*-runner.jar /deployments/quarkus-run.jar
 
 EXPOSE 8080
-USER 1001
+USER 185
 
-ENTRYPOINT [ "/deployments/run-java.sh" ]
+ENTRYPOINT [ "java", "-jar", "/deployments/quarkus-run.jar" ]
diff --git a/theodolite/src/main/docker/Dockerfile.native b/theodolite/src/main/docker/Dockerfile.native
index 04a1dd6f2b6cc99511bf705eed5d98be1da25b05..34ccd6622bf2fba6f9707989fffd9bb6390a4a8b 100644
--- a/theodolite/src/main/docker/Dockerfile.native
+++ b/theodolite/src/main/docker/Dockerfile.native
@@ -14,12 +14,12 @@
 # docker run -i --rm -p 8080:8080 quarkus/theodolite
 #
 ###
-FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4
-WORKDIR /deployments/
-RUN chown 1001 /deployments \
-    && chmod "g+rwX" /deployments \
-    && chown 1001:root /deployments
-COPY --chown=1001:root build/*-runner /deployments/application
+FROM quay.io/quarkus/quarkus-micro-image:1.0
+WORKDIR /work/
+RUN chown 1001 /work \
+    && chmod "g+rwX" /work \
+    && chown 1001:root /work
+COPY --chown=1001:root build/*-runner /work/application
 
 EXPOSE 8080
 USER 1001
diff --git a/theodolite/src/main/docker/Dockerfile.native-distroless b/theodolite/src/main/docker/Dockerfile.native-distroless
index 1ed64110dd931bf3fea9100e3318318ad40b6966..951dfb64bee56e277d057c8f9e97796e88f30ac2 100644
--- a/theodolite/src/main/docker/Dockerfile.native-distroless
+++ b/theodolite/src/main/docker/Dockerfile.native-distroless
@@ -15,8 +15,7 @@
 #
 ###
 FROM quay.io/quarkus/quarkus-distroless-image:1.0
-WORKDIR /deployments/
-COPY build/*-runner /deployments/application
+COPY build/*-runner /application
 
 EXPOSE 8080
 USER nonroot
diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt
index 27e3206ad7b60d61cab94caaef8a3279d834fe65..f85b83497e5d69e43c1d4784ef86170a5436e929 100644
--- a/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt
+++ b/theodolite/src/main/kotlin/theodolite/benchmark/ConfigMapResourceSet.kt
@@ -12,7 +12,7 @@ import java.lang.IllegalArgumentException
 
 @RegisterForReflection
 @JsonDeserialize
-class ConfigMapResourceSet: ResourceSet, KubernetesResource {
+class ConfigMapResourceSet : ResourceSet, KubernetesResource {
     lateinit var name: String
     lateinit var files: List<String> // load all files, iff files is not set
 
@@ -26,30 +26,35 @@ class ConfigMapResourceSet: ResourceSet, KubernetesResource {
                 .withName(name)
                 .get() ?: throw DeploymentFailedException("Cannot find ConfigMap with name '$name'."))
                 .data
-                .filter { it.key.endsWith(".yaml") }
+                .filter { it.key.endsWith(".yaml") || it.key.endsWith(".yml")}
         } catch (e: KubernetesClientException) {
             throw DeploymentFailedException("Cannot find or read ConfigMap with name '$name'.", e)
         }
 
-        if (::files.isInitialized){
-            resources = resources.filter { files.contains(it.key) }
-
-            if (resources.size != files.size) {
+        if (::files.isInitialized) {
+            val filteredResources = resources.filter { files.contains(it.key) }
+            if (filteredResources.size != files.size) {
                 throw DeploymentFailedException("Could not find all specified Kubernetes manifests files")
             }
+            resources = filteredResources
         }
 
         return try {
             resources
-                .map { Pair(
-                    getKind(resource = it.value),
-                    it) }
+                .map {
+                    Pair(
+                        getKind(resource = it.value),
+                        it
+                    )
+                }
                 .map {
                     Pair(
                         it.second.key,
-                        loader.loadK8sResource(it.first, it.second.value)) }
+                        loader.loadK8sResource(it.first, it.second.value)
+                    )
+                }
         } catch (e: IllegalArgumentException) {
-            throw DeploymentFailedException("Can not create resource set from specified configmap", e)
+            throw DeploymentFailedException("Cannot create resource set from specified ConfigMap", e)
         }
 
     }
@@ -58,10 +63,7 @@ class ConfigMapResourceSet: ResourceSet, KubernetesResource {
         val parser = YamlParserFromString()
         val resourceAsMap = parser.parse(resource, HashMap<String, String>()::class.java)
 
-        return try {
-            resourceAsMap?.get("kind") !!
-        } catch (e: NullPointerException) {
-            throw DeploymentFailedException( "Could not find field kind of Kubernetes resource: ${resourceAsMap?.get("name")}", e)
-        }
+        return resourceAsMap?.get("kind")
+            ?: throw DeploymentFailedException("Could not find field kind of Kubernetes resource: ${resourceAsMap?.get("name")}")
     }
 }
\ No newline at end of file
diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt b/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt
index e769f8b9883b98d9787f2de65571fc94056c3b9c..f830232de4b6956fa0f989cae131903377862e6c 100644
--- a/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt
+++ b/theodolite/src/main/kotlin/theodolite/benchmark/FileSystemResourceSet.kt
@@ -28,7 +28,7 @@ class FileSystemResourceSet: ResourceSet, KubernetesResource {
         return try {
             File(path)
                 .list() !!
-                .filter { it.endsWith(".yaml") } // consider only yaml files, e.g. ignore readme files
+                .filter { it.endsWith(".yaml") || it.endsWith(".yml") }
                 .map {
                     loadSingleResource(resourceURL = it, client = client)
                 }
diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
index 70d8b241c84d1c6875c8da3d74cd90b3f57956d6..d42c2ea3c0ed5394fdcf5b89be0fe0470a15ba62 100644
--- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
+++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
@@ -39,7 +39,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
     lateinit var name: String
     lateinit var resourceTypes: List<TypeName>
     lateinit var loadTypes: List<TypeName>
-    lateinit var kafkaConfig: KafkaConfig
+    var kafkaConfig: KafkaConfig? = null
     lateinit var infrastructure: Resources
     lateinit var sut: Resources
     lateinit var loadGenerator: Resources
@@ -110,6 +110,9 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
                 patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
             }
         }
+
+        val kafkaConfig = this.kafkaConfig
+
         return KubernetesBenchmarkDeployment(
             sutBeforeActions = sut.beforeActions,
             sutAfterActions = sut.afterActions,
@@ -119,8 +122,8 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
             loadGenResources = loadGenResources.map { it.second },
             loadGenerationDelay = loadGenerationDelay,
             afterTeardownDelay = afterTeardownDelay,
-            kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
-            topics = kafkaConfig.topics,
+            kafkaConfig = if (kafkaConfig != null) hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer) else mapOf(),
+            topics = kafkaConfig?.topics ?: listOf(),
             client = this.client
         )
     }
diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
index 9d32a4eeab656143e10b5057a173e04245d6f22b..3331444a17b4c2a1aa4411c1e27b3d1e087f8841 100644
--- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
+++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
@@ -31,7 +31,7 @@ class KubernetesBenchmarkDeployment(
     val loadGenResources: List<KubernetesResource>,
     private val loadGenerationDelay: Long,
     private val afterTeardownDelay: Long,
-    private val kafkaConfig: HashMap<String, Any>,
+    private val kafkaConfig: Map<String, Any>,
     private val topics: List<KafkaConfig.TopicWrapper>,
     private val client: NamespacedKubernetesClient
 ) : BenchmarkDeployment {
@@ -46,9 +46,12 @@ class KubernetesBenchmarkDeployment(
      *  - Deploy the needed resources.
      */
     override fun setup() {
-        val kafkaTopics = this.topics.filter { !it.removeOnly }
-            .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
-        kafkaController.createTopics(kafkaTopics)
+        if (this.topics.isNotEmpty()) {
+            val kafkaTopics = this.topics
+                .filter { !it.removeOnly }
+                .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
+            kafkaController.createTopics(kafkaTopics)
+        }
         sutBeforeActions.forEach { it.exec(client = client) }
         appResources.forEach { kubernetesManager.deploy(it) }
         logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
@@ -69,7 +72,9 @@ class KubernetesBenchmarkDeployment(
         loadGenAfterActions.forEach { it.exec(client = client) }
         appResources.forEach { kubernetesManager.remove(it) }
         sutAfterActions.forEach { it.exec(client = client) }
-        kafkaController.removeTopics(this.topics.map { topic -> topic.name })
+        if (this.topics.isNotEmpty()) {
+            kafkaController.removeTopics(this.topics.map { topic -> topic.name })
+        }
         ResourceByLabelHandler(client).removePods(
             labelName = LAG_EXPORTER_POD_LABEL_NAME,
             labelValue = LAG_EXPORTER_POD_LABEL_VALUE
diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt
index 40f5b7ddbbfc9da4514b8a88946d97149b94b390..6dcfb582655ff9295aedd63d8c30cbac7daae2b3 100644
--- a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt
+++ b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt
@@ -24,7 +24,7 @@ class BenchmarkStateChecker(
         Thread {
             while (running) {
                 updateBenchmarkStatus()
-                Thread.sleep(100 * 1)
+                Thread.sleep(1000)
             }
         }.start()
     }
diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt
index 2b6f83c76ce6e31f85cdfec1962f9523c3d297b8..5f4180b0b4b58fa94b979c71998314baae63a91b 100644
--- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt
+++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt
@@ -37,9 +37,9 @@ class TheodoliteController(
      */
     fun run() {
         sleep(5000) // wait until all states are correctly set
+        benchmarkStateChecker.start(true)
         while (true) {
             reconcile()
-            benchmarkStateChecker.start(true)
             sleep(2000)
         }
     }
@@ -98,11 +98,11 @@ class TheodoliteController(
                     }
                 else -> {
                     executionStateHandler.setExecutionState(execution.name, ExecutionState.FAILURE)
-                    logger.warn { "Unexpected execution state, set state to ${ExecutionState.FAILURE.value}" }
+                    logger.warn { "Unexpected execution state, set state to ${ExecutionState.FAILURE.value}." }
                 }
             }
         } catch (e: Exception) {
-                EventCreator().createEvent(
+            EventCreator().createEvent(
                 executionName = execution.name,
                 type = "WARNING",
                 reason = "Execution failed",