diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 8f269d9edeba4f3487d897a39a6568d4fa0ee73f..1909b664ecb74f5ccdbed6462addab49f16849d1 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -20,6 +20,7 @@ stages:
 
 lint-helm:
   stage: check
+  needs: []
   image:
     name: alpine/helm:3.5.2
     entrypoint: [""]
@@ -324,6 +325,7 @@ deploy-theodolite:
 
 test-slo-checker-lag-trend:
   stage: test
+  needs: []
   image: python:3.7-slim
   tags:
     - exec-docker
@@ -335,6 +337,7 @@ test-slo-checker-lag-trend:
 
 test-slo-checker-dropped-records-kstreams:
   stage: test
+  needs: []
   image: python:3.7-slim
   tags:
     - exec-docker
diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle
index 773872648edfd4b30218a99d307b6e7c45ed3470..5b0e2a8a1211653428b296b11b14c1531e40e46b 100644
--- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle
+++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle
@@ -50,7 +50,7 @@ pmd {
   ruleSets = [] // Gradle requires to clean the rule sets first
   ruleSetFiles = files("$rootProject.projectDir/config/pmd.xml")
   ignoreFailures = false
-  toolVersion = "6.7.0"
+  toolVersion = "6.13.0"
 }
 
 checkstyle {
@@ -58,7 +58,7 @@ checkstyle {
   configFile = file("$rootProject.projectDir/config/checkstyle.xml")
   maxWarnings = 0
   ignoreFailures = false
-  toolVersion = "8.12"
+  toolVersion = "8.19"
 }
 
 spotbugs {
diff --git a/theodolite-benchmarks/docker-test/README.md b/theodolite-benchmarks/docker-test/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..fd1e9bf4730f897273be45a022ad2adeae1b7e6e
--- /dev/null
+++ b/theodolite-benchmarks/docker-test/README.md
@@ -0,0 +1,38 @@
+# Docker Compose Files for Testing
+
+This directory contains Docker Compose files, which help testing Benchmark implementations.
+For each stream processing engine (Kafka Streams and Flink) and Benchmark (UC1-4), a Docker Compose file is provided
+in the corresponding subdirectory.
+
+## Full Dockerized Testing
+
+Running the load generator, the benchmark and all required infrastructure (Kafka etc.) is easy. Simply, `cd` into the
+directory of the benchmark implementation Compose file and *up* it.
+For example:
+
+```sh
+cd uc1-kstreams-docker-compose/
+docker-compose up -d
+```
+
+On less powerful hardware, starting all containers together might fail. In such cases, it usually helps to first *up*
+Kafka and ZooKeeper (`docker-compose up -d kafka zookeeper`), then after some delay the Schema Registry
+(`docker-compose up -d schema-registry`) and finally, after some more further delay, the rest (`docker-compose up -d`).
+
+To tear down the entire Docker Compose configuration:
+
+```sh
+docker-compose down
+```
+
+## Benchmark (+ Load Generator) on Host, Infrastructure in Docker
+
+For development and debugging purposes, it is often required to run the benchmark and/or the load generator directly on
+the host, for example, from the IDE or Gradle. In such cases, the following adjustments have to be made to the
+`docker-compose.yaml` file:
+
+1. Comment out the services that you intend to run locally.
+2. Uncomment the `ports` block in the Kafka and the Schema Registry services.
+
+You can now connect to Kafka from your host system with bootstrap server `localhost:19092` and contact the Schema
+Registry via `localhost:8081`. **Pay attention to the Kafka port, which is *19092* instead of the default one *9092*.**
diff --git a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml
index 2a0e6cda45fb81b5b20c658d9c51a4ced1ab2aae..419c9cfb741578cccd91845c8164d4e5554d2ab6 100755
--- a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml
@@ -37,7 +37,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
index 36717ed16bd46fd530bba1b02b0e32a929fa1efc..cebf3676e92f8ececa5b6707df156f9f22f3be38 100755
--- a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml
@@ -45,7 +45,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml
index 9afe3650368aa3f53a4a9272c29216c0dbda1933..c4265702b6f2b833e7b3792a787e3d8a67486ac7 100755
--- a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml
@@ -37,7 +37,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
index fc4748758cceb6948fc409704a6a9c69cf56649a..b520611e4855f6e942fab62b02d27d5f360860d1 100755
--- a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml
@@ -46,7 +46,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml
index 17dd9220c73f98f7c45463ab6dc998d2bdcc359c..2c69a659c6ed1e83c149e699484ec148196806c5 100755
--- a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml
@@ -37,7 +37,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
index 1e5c22a59a6755ae975c3a760615be311cb4329f..5ed8e7a673afd825b2e1426fa018db3e00848296 100755
--- a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml
@@ -31,7 +31,7 @@ services:
     environment:
       SCHEMA_REGISTRY_HOST_NAME: schema-registry
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
-      benchmark:
+  benchmark:
     image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
     depends_on:
       - schema-registry
@@ -45,7 +45,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml
index 80720063991100bae2c8c148f14cd6f1a32bb0ff..b6bb905e2a950e23970392f256f16935a7777fed 100755
--- a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml
@@ -37,7 +37,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
index 5e4cb94469f2f6cc8c48694a7ea6c885f066622d..68264b244c16f1a1be7b370bb4e78052d3a8518f 100755
--- a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
+++ b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml
@@ -45,7 +45,7 @@ services:
       - schema-registry
       - kafka
     environment:
-      BOOTSTRAP_SERVER: uc-wg:5701
+      BOOTSTRAP_SERVER: load-generator:5701
       PORT: 5701
       KAFKA_BOOTSTRAP_SERVERS: kafka:9092
       SCHEMA_REGISTRY_URL: http://schema-registry:8081
diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
index f1f9870fda73ccec0fc25c5c70665759ab07d893..fe74fbe4b9dcb6ce89d10131de1336bfff40a919 100644
--- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
+++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java
@@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.math.Stats;
-
 import java.io.Serializable;
 
 /**
@@ -13,7 +12,7 @@ import java.io.Serializable;
  */
 public class StatsSerializer extends Serializer<Stats> implements Serializable {
 
-  private static final long serialVersionUID = -1276866176534267373L; //NOPMD
+  private static final long serialVersionUID = -1276866176534267373L; // NOPMD
 
   @Override
   public void write(final Kryo kryo, final Output output, final Stats object) {
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java
new file mode 100644
index 0000000000000000000000000000000000000000..11a9cbf2d96bc3a02f3972ba23f2167af06a2ec3
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java
@@ -0,0 +1,18 @@
+package theodolite.commons.workloadgeneration;
+
+/**
+ * Interface representing a record generator action consisting of generating a record and sending
+ * it.
+ */
+@FunctionalInterface
+interface GeneratorAction {
+
+  void generate(final String key);
+
+  public static <T> GeneratorAction from(
+      final RecordGenerator<? extends T> generator,
+      final RecordSender<? super T> sender) {
+    return key -> sender.send(generator.generate(key));
+  }
+
+}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
index 6e4a43271fbf1e0193c2d39569a0814d1f7935cd..ded7c347c8d6b057581dc63b691df5bb60997791 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java
@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
         avroSerdeFactory.<T>forKeys().serializer());
   }
 
+  /**
+   * Write the passed monitoring record to Kafka.
+   */
+  public void write(final T monitoringRecord) {
+    final ProducerRecord<String, T> record =
+        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
+            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
+
+    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
+    try {
+      this.producer.send(record);
+    } catch (final SerializationException e) {
+      LOGGER.warn(
+          "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
+          e);
+    }
+  }
+
+  public void terminate() {
+    this.producer.close();
+  }
+
+  @Override
+  public void send(final T message) {
+    this.write(message);
+  }
+
   public static <T extends SpecificRecord> Builder<T> builder(
       final String bootstrapServers,
       final String topic,
@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
     }
   }
 
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    try {
-      this.producer.send(record);
-    } catch (final SerializationException e) {
-      LOGGER.warn(
-          "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
-          e);
-    }
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-  @Override
-  public void send(final T message) {
-    this.write(message);
-  }
-
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
index a9a1ce65ac32e3508299c99a38ecd21e4c9461cf..73f064d1ce44ff8a613f9ce0a7b9a64d4bac6c38 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
@@ -91,12 +91,11 @@ public final class LoadGenerator {
             new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
             Duration.ofMillis(PERIOD_MS_DEFAULT)))
         .setGeneratorConfig(new LoadGeneratorConfig(
-            TitanMessageGeneratorFactory
-                .withKafkaConfig(
-                    KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
-                    KAFKA_TOPIC_DEFAULT,
-                    SCHEMA_REGISTRY_URL_DEFAULT)
-                .forConstantValue(VALUE_DEFAULT)));
+            TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
+            TitanKafkaSenderFactory.forKafkaConfig(
+                KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
+                KAFKA_TOPIC_DEFAULT,
+                SCHEMA_REGISTRY_URL_DEFAULT)));
   }
 
   /**
@@ -170,13 +169,11 @@ public final class LoadGenerator {
             new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
             Duration.ofMillis(periodMs)))
         .setGeneratorConfig(new LoadGeneratorConfig(
-            TitanMessageGeneratorFactory
-                .withKafkaConfig(
-                    kafkaBootstrapServers,
-                    kafkaInputTopic,
-                    schemaRegistryUrl,
-                    kafkaProperties)
-                .forConstantValue(value)))
+            TitanRecordGeneratorFactory.forConstantValue(value),
+            TitanKafkaSenderFactory.forKafkaConfig(
+                kafkaBootstrapServers,
+                kafkaInputTopic,
+                schemaRegistryUrl)))
         .withThreads(threads);
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
index 2e907d8e90172288099bc6a1776777c37ae90fff..4b5fea3e4670315ef47d94669b42a3cca4b5d0ae 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration;
  */
 public class LoadGeneratorConfig {
 
-  private final MessageGenerator messageGenerator;
+  private final GeneratorAction messageGenerator;
   private BeforeAction beforeAction = BeforeAction.doNothing();
   private int threads = 1;
 
-  public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
-    this.messageGenerator = messageGenerator;
+  public <T> LoadGeneratorConfig(
+      final RecordGenerator<? extends T> generator,
+      final RecordSender<? super T> sender) {
+    this.messageGenerator = GeneratorAction.from(generator, sender);
   }
 
-  public LoadGeneratorConfig(
-      final MessageGenerator messageGenerator,
+  public <T> LoadGeneratorConfig(
+      final RecordGenerator<? extends T> generator,
+      final RecordSender<? super T> sender,
       final int threads) {
-    this.messageGenerator = messageGenerator;
+    this(generator, sender);
     this.threads = threads;
   }
 
-  public LoadGeneratorExecution buildLoadGeneratorExecution(
-      final WorkloadDefinition workloadDefinition) {
-    return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
-  }
-
-  public BeforeAction getBeforeAction() {
-    return this.beforeAction;
-  }
-
   public void setThreads(final int threads) {
     this.threads = threads;
   }
@@ -37,6 +31,13 @@ public class LoadGeneratorConfig {
     this.beforeAction = beforeAction;
   }
 
+  public BeforeAction getBeforeAction() {
+    return this.beforeAction;
+  }
 
+  public LoadGeneratorExecution buildLoadGeneratorExecution(
+      final WorkloadDefinition workloadDefinition) {
+    return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
+  }
 
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
index 3934c3d3499215b37ce96391ff5ae1d5cc135f84..e1a2a7e1bea964b5c69a6cd34374d7b0932bac03 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
@@ -8,25 +8,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be
+ * A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be
  * started and stopped.
  */
-public class LoadGeneratorExecution {
+class LoadGeneratorExecution {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
 
   private final Random random = new Random();
   private final WorkloadDefinition workloadDefinition;
-  private final MessageGenerator messageGenerator;
+  private final GeneratorAction messageGenerator;
   private final ScheduledExecutorService executor;
 
   /**
    * Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a
-   * {@link MessageGenerator}. Load is generated by the given number of threads.
+   * {@link GeneratorAction}. Load is generated by the given number of threads.
    */
   public LoadGeneratorExecution(
       final WorkloadDefinition workloadDefinition,
-      final MessageGenerator messageGenerator,
+      final GeneratorAction messageGenerator,
       final int threads) {
     this.workloadDefinition = workloadDefinition;
     this.messageGenerator = messageGenerator;
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java
deleted file mode 100644
index c369f16557d60dae50e22ec7ad820c6a0ab4d137..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package theodolite.commons.workloadgeneration;
-
-/**
- * Interface representing a message generator, which sends messages for given keys to some
- * destination.
- */
-@FunctionalInterface
-public interface MessageGenerator {
-
-  void generate(final String key);
-
-  public static <T> MessageGenerator from(
-      final RecordGenerator<T> generator,
-      final RecordSender<T> sender) {
-    return key -> sender.send(generator.generate(key));
-  }
-
-}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..0cdf8d91ea01cc16df5dcd55d77b08c3f4986442
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java
@@ -0,0 +1,42 @@
+package theodolite.commons.workloadgeneration;
+
+import java.util.Properties;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s.
+ */
+public final class TitanKafkaSenderFactory {
+
+  private TitanKafkaSenderFactory() {}
+
+  /**
+   * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
+   * configuration.
+   */
+  public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
+      final String bootstrapServers,
+      final String topic,
+      final String schemaRegistryUrl) {
+    return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
+  }
+
+  /**
+   * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka
+   * configuration.
+   */
+  public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig(
+      final String bootstrapServers,
+      final String topic,
+      final String schemaRegistryUrl,
+      final Properties properties) {
+    return KafkaRecordSender
+        .<ActivePowerRecord>builder(
+            bootstrapServers,
+            topic,
+            schemaRegistryUrl)
+        .keyAccessor(r -> r.getIdentifier())
+        .timestampAccessor(r -> r.getTimestamp())
+        .build();
+  }
+}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java
deleted file mode 100644
index bd0b41d4e6e004d024ed2fd179eddcf6af50438f..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package theodolite.commons.workloadgeneration;
-
-import java.util.Properties;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s
- * and sends them via Kafka.
- */
-public final class TitanMessageGeneratorFactory {
-
-  private final RecordSender<ActivePowerRecord> recordSender;
-
-  private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
-    this.recordSender = recordSender;
-  }
-
-  /**
-   * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a
-   * constant value.
-   */
-  public MessageGenerator forConstantValue(final double value) {
-    return MessageGenerator.from(
-        sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
-        this.recordSender);
-  }
-
-  /**
-   * Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
-   */
-  public static TitanMessageGeneratorFactory withKafkaConfig(
-      final String bootstrapServers,
-      final String topic,
-      final String schemaRegistryUrl) {
-    return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
-  }
-
-  /**
-   * Create a new TitanMessageGeneratorFactory for the given Kafka configuration.
-   */
-  public static TitanMessageGeneratorFactory withKafkaConfig(
-      final String bootstrapServers,
-      final String topic,
-      final String schemaRegistryUrl,
-      final Properties properties) {
-    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender
-        .<ActivePowerRecord>builder(
-            bootstrapServers,
-            topic,
-            schemaRegistryUrl)
-        .keyAccessor(r -> r.getIdentifier())
-        .timestampAccessor(r -> r.getTimestamp())
-        .build();
-    return new TitanMessageGeneratorFactory(kafkaRecordSender);
-  }
-
-}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..4e1c10071eff28d77514dbc121e30bead3f6fa74
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java
@@ -0,0 +1,21 @@
+package theodolite.commons.workloadgeneration;
+
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s.
+ */
+public final class TitanRecordGeneratorFactory {
+
+
+  private TitanRecordGeneratorFactory() {}
+
+  /**
+   * Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a
+   * constant value.
+   */
+  public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) {
+    return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value);
+  }
+
+}