From ffe78ce048394df53ca47ff6b0524e26106ac279 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Mon, 22 Feb 2021 14:43:11 +0100
Subject: [PATCH] Introduce new workload generator based on Hazelcast

---
 .../uc1/workloadgenerator/LoadGenerator.java  |   2 +-
 .../uc2/workloadgenerator/LoadGenerator.java  |   2 +-
 .../uc3/workloadgenerator/LoadGenerator.java  |   4 +-
 .../uc4/workloadgenerator/LoadGenerator.java  |   2 +-
 .../workload-generator-commons/build.gradle   |   2 +
 .../workloadgeneration/ClusterConfig.java     |  47 +++++
 .../workloadgeneration/ConfigurationKeys.java |  38 ++++
 .../workloadgeneration/HazelcastRunner.java   |  92 +++++++++
 .../HazelcastRunnerStateInstance.java         | 187 ++++++++++++++++++
 .../{dimensions => }/KeySpace.java            |  33 ++--
 .../workloadgeneration/LoadGenerator.java     | 147 ++++++++++++++
 .../LoadGeneratorConfig.java                  |  48 +++++
 .../LoadGeneratorExecution.java               |  46 +++++
 .../WorkloadDefinition.java                   |  51 +++++
 .../kafka/KafkaRecordSender.java              |  19 +-
 .../functions/BeforeAction.java               |   5 +
 .../functions/MessageGenerator.java           |  16 +-
 .../functions/RecordGenerator.java            |  14 ++
 .../{Transport.java => RecordSender.java}     |   4 +-
 .../TitanMessageGeneratorFactory.java         |  44 +++++
 .../generators/AbstractWorkloadGenerator.java |  16 +-
 .../generators/KafkaWorkloadGenerator.java    |   6 +-
 .../KafkaWorkloadGeneratorBuilder.java        |   8 +-
 .../misc/WorkloadDefinition.java              |   2 +-
 .../misc/WorkloadEntity.java                  |   8 +-
 .../dimensions/KeySpaceTest.java              |  30 +++
 .../misc/WorkloadDefinition2Test.java         |  63 ++++++
 27 files changed, 884 insertions(+), 52 deletions(-)
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java
 rename benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/{dimensions => }/KeySpace.java (64%)
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordGenerator.java
 rename benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/{Transport.java => RecordSender.java} (82%)
 create mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/TitanMessageGeneratorFactory.java
 create mode 100644 benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/dimensions/KeySpaceTest.java
 create mode 100644 benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition2Test.java

diff --git a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
index a7b27dfdb..3ecc06db1 100644
--- a/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
+++ b/benchmarks/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
@@ -8,8 +8,8 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
diff --git a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
index 3eb3e8d25..215c23766 100644
--- a/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
+++ b/benchmarks/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
@@ -8,8 +8,8 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
diff --git a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
index 85f6a9403..5e0532bc5 100644
--- a/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
+++ b/benchmarks/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
@@ -8,8 +8,8 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
@@ -72,7 +72,7 @@ public final class LoadGenerator {
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
     final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
-        new KafkaRecordSender.Builder<ActivePowerRecord>(
+        KafkaRecordSender.<ActivePowerRecord>builder(
             kafkaBootstrapServers,
             kafkaInputTopic,
             schemaRegistryUrl)
diff --git a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
index ff551e7ef..6ccc9d467 100644
--- a/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
+++ b/benchmarks/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
@@ -8,8 +8,8 @@ import java.util.Properties;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
diff --git a/benchmarks/workload-generator-commons/build.gradle b/benchmarks/workload-generator-commons/build.gradle
index eef987cd4..51e1b46b7 100644
--- a/benchmarks/workload-generator-commons/build.gradle
+++ b/benchmarks/workload-generator-commons/build.gradle
@@ -1,3 +1,5 @@
 dependencies {
     implementation 'org.apache.curator:curator-recipes:4.3.0'
+    implementation 'com.hazelcast:hazelcast:4.1.1'
+    // compile group: "com.hazelcast", name: "hazelcast-kubernetes", version: "4.1.1"
 }
\ No newline at end of file
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java
new file mode 100644
index 000000000..c871f9a1a
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ClusterConfig.java
@@ -0,0 +1,47 @@
+package theodolite.commons.workloadgeneration;
+
+public class ClusterConfig {
+  public static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
+  public static final int PORT_DEFAULT = 5701;
+  public static final boolean PORT_AUTO_INCREMENT_DEFAULT = true;
+  public static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
+
+  private final String bootstrapServer;
+  private final int port;
+  private final boolean portAutoIncrement;
+  private final String clusterNamePrefix;
+
+  public ClusterConfig() {
+    this(
+        BOOTSTRAP_SERVER_DEFAULT,
+        PORT_DEFAULT,
+        PORT_AUTO_INCREMENT_DEFAULT,
+        CLUSTER_NAME_PREFIX_DEFAULT);
+  }
+
+  public ClusterConfig(final String bootstrapServer, final int port,
+      final boolean portAutoIncrement, final String clusterNamePrefix) {
+    this.bootstrapServer = bootstrapServer;
+    this.port = port;
+    this.portAutoIncrement = portAutoIncrement;
+    this.clusterNamePrefix = clusterNamePrefix;
+  }
+
+  public String getBootstrapServer() {
+    return this.bootstrapServer;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public boolean isPortAutoIncrement() {
+    return this.portAutoIncrement;
+  }
+
+  public String getClusterNamePrefix() {
+    return this.clusterNamePrefix;
+  }
+
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java
new file mode 100644
index 000000000..81610170c
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java
@@ -0,0 +1,38 @@
+package theodolite.commons.workloadgeneration;
+
+/**
+ * Keys to access configuration parameters.
+ */
+public final class ConfigurationKeys {
+
+  public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
+
+  public static final String PORT = "PORT";
+
+  public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
+
+  public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
+
+  public static final String NUM_SENSORS = "NUM_SENSORS";
+
+  public static final String PERIOD_MS = "PERIOD_MS";
+
+  public static final String VALUE = "VALUE";
+
+  public static final String THREADS = "THREADS";
+
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
+
+  public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
+
+  public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC";
+
+  public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE";
+
+  public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS";
+
+  public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
+
+  private ConfigurationKeys() {}
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java
new file mode 100644
index 000000000..a2a9c1411
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunner.java
@@ -0,0 +1,92 @@
+package theodolite.commons.workloadgeneration;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.cluster.MembershipEvent;
+import com.hazelcast.cluster.MembershipListener;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.JoinConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class HazelcastRunner {
+
+  private final HazelcastInstance hzInstance;
+  private volatile HazelcastRunnerStateInstance runnerState;
+  private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
+  private final LoadGeneratorConfig loadConfig;
+  private final WorkloadDefinition totalLoadDefinition;
+
+  public HazelcastRunner(
+      final ClusterConfig clusterConfig,
+      final LoadGeneratorConfig loadConfig,
+      final WorkloadDefinition totalLoadDefinition) {
+    this.loadConfig = loadConfig;
+    this.totalLoadDefinition = totalLoadDefinition;
+    this.hzInstance = buildhazelcastInstance(clusterConfig, totalLoadDefinition.toString());
+    this.hzInstance.getCluster().addMembershipListener(new RunnerMembershipListener());
+  }
+
+  public void runBlocking() {
+    while (!this.stopAction.isDone()) {
+      synchronized (this) {
+        final Set<Member> members = this.hzInstance.getCluster().getMembers();
+        this.runnerState = new HazelcastRunnerStateInstance(
+            this.loadConfig,
+            this.totalLoadDefinition,
+            this.hzInstance, members);
+      }
+      this.runnerState.runBlocking();
+    }
+  }
+
+  public void restart() {
+    this.stopRunnerState();
+  }
+
+  public void stop() {
+    this.stopAction.complete(null);
+    this.stopRunnerState();
+  }
+
+  private void stopRunnerState() {
+    synchronized (this) {
+      if (this.runnerState != null) {
+        this.runnerState.stopAsync();
+      }
+    }
+  }
+
+  private class RunnerMembershipListener implements MembershipListener {
+
+    @Override
+    public void memberAdded(final MembershipEvent membershipEvent) {
+      HazelcastRunner.this.restart();
+    }
+
+    @Override
+    public void memberRemoved(final MembershipEvent membershipEvent) {
+      HazelcastRunner.this.restart();
+    }
+
+  }
+
+  private static HazelcastInstance buildhazelcastInstance(
+      final ClusterConfig cluster,
+      final String clusterName) {
+    final Config config = new Config()
+        .setClusterName(cluster.getClusterNamePrefix() + '_' + clusterName);
+
+    final JoinConfig joinConfig = config.getNetworkConfig()
+        .setPort(cluster.getPort())
+        .setPortAutoIncrement(cluster.isPortAutoIncrement())
+        .getJoin();
+    joinConfig.getMulticastConfig().setEnabled(false);
+    // joinConfig.getKubernetesConfig().setEnabled(true);
+    joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer());
+
+    return Hazelcast.newHazelcastInstance(config);
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java
new file mode 100644
index 000000000..6830cf793
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HazelcastRunnerStateInstance.java
@@ -0,0 +1,187 @@
+package theodolite.commons.workloadgeneration;
+
+import com.google.common.collect.Streams;
+import com.hazelcast.cluster.Member;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.cp.IAtomicReference;
+import com.hazelcast.cp.lock.FencedLock;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO Reconsider name -> Hazelcast Instance
+public class HazelcastRunnerStateInstance {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRunnerStateInstance.class);
+
+  private static final Duration BEFORE_ACTION_WAIT_DURATION = Duration.ofMillis(500);
+  private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500);
+
+  private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
+  private LoadGeneratorExecution loadGeneratorExecution;
+
+  private final LoadGeneratorConfig loadGeneratorConfig;
+  private final WorkloadDefinition totalLoadDefinition;
+  private final HazelcastInstance hzInstance;
+  private final Set<Member> members;
+
+  public HazelcastRunnerStateInstance(
+      final LoadGeneratorConfig loadGeneratorConfig,
+      final WorkloadDefinition totalLoadDefinition,
+      final HazelcastInstance hzInstance,
+      final Set<Member> members) {
+    this.hzInstance = hzInstance;
+    this.members = members;
+    this.loadGeneratorConfig = loadGeneratorConfig;
+    this.totalLoadDefinition = totalLoadDefinition;
+
+    LOGGER.info("Created new Hazelcast runner instance for member set '{}'", this.members);
+  }
+
+  public void runBlocking() {
+    if (!this.stopAction.isDone()) {
+      this.tryPerformBeforeAction();
+      this.tryCreateTaskAssignment();
+      this.startLoadGeneration();
+    }
+    this.stopAction.join();
+    this.stopLoadGeneration();
+  }
+
+  public void stopAsync() {
+    this.stopAction.complete(null);
+  }
+
+  private void tryPerformBeforeAction() {
+    final FencedLock lock = this.getBeforeActionPerformerLock();
+    final IAtomicReference<Boolean> isActionPerformed = this.getIsBeforeActionPerformed(); // NOPMD
+    isActionPerformed.alter(p -> p != null && p); // p -> p == null ? false : p
+    boolean triedPerformingBeforeAction = false;
+    while (!isActionPerformed.get()) {
+      // Try performing the before action
+      triedPerformingBeforeAction = true;
+      if (lock.tryLock()) {
+        try {
+          if (!isActionPerformed.get()) {
+            LOGGER.info("This instance is elected to perform the before action.");
+            this.loadGeneratorConfig.getBeforeAction().run();
+            LOGGER.info("Before action performed.");
+            isActionPerformed.set(true);
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        LOGGER.info("Wait for before action to be performed.");
+        delay(BEFORE_ACTION_WAIT_DURATION);
+      }
+    }
+    if (!triedPerformingBeforeAction) {
+      LOGGER.info("Before action has already been performed.");
+    }
+  }
+
+
+
+  private void tryCreateTaskAssignment() {
+    final Map<UUID, WorkloadDefinition> taskAssignment = this.getTaskAssignment();
+    final FencedLock lock = this.getTaskAssignmentLock();
+
+    boolean triedCreatingTaskAssignment = false;
+    while (taskAssignment.size() != this.members.size()) {
+      // Try creating task assignment
+      triedCreatingTaskAssignment = true;
+      if (lock.tryLock()) {
+        try {
+          if (taskAssignment.size() != this.members.size()) {
+            LOGGER.info("This instance is elected to create the task assignment.");
+
+            final Set<WorkloadDefinition> subLoadDefinitions =
+                this.totalLoadDefinition.divide(this.members.size());
+            Streams
+                .zip(
+                    subLoadDefinitions.stream(),
+                    this.members.stream(),
+                    (loadDef, member) -> new LoadDefPerMember(loadDef, member))
+                .forEach(l -> taskAssignment.put(l.member.getUuid(), l.loadDefinition));
+
+            LOGGER.info("Task assignment created.");
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        LOGGER.info("Wait for task assignment to be available.");
+        delay(TASK_ASSIGNMENT_WAIT_DURATION);
+      }
+    }
+    if (!triedCreatingTaskAssignment) {
+      LOGGER.info("Task assignment is already available.");
+    }
+  }
+
+  private void startLoadGeneration() {
+    if (this.loadGeneratorExecution != null) {
+      throw new IllegalStateException("Load generation has already started before.");
+    }
+    LOGGER.info("Start running load generation and pick assigned task.");
+
+    final Member member = this.hzInstance.getCluster().getLocalMember();
+    final WorkloadDefinition workload = this.getTaskAssignment().get(member.getUuid());
+
+    LOGGER.info("Run load generation for assigned task: {}", workload);
+    this.loadGeneratorExecution = this.loadGeneratorConfig.buildLoadGeneratorExecution(workload);
+    this.loadGeneratorExecution.start();
+  }
+
+  private void stopLoadGeneration() {
+    this.loadGeneratorExecution.stop();
+  }
+
+  private IAtomicReference<Boolean> getIsBeforeActionPerformed() {
+    return this.hzInstance.getCPSubsystem().getAtomicReference("isBeforeActionPerformed");
+  }
+
+  private FencedLock getBeforeActionPerformerLock() {
+    return this.hzInstance.getCPSubsystem().getLock("beforeActionPerformer");
+  }
+
+  private Map<UUID, WorkloadDefinition> getTaskAssignment() {
+    return this.hzInstance.getReplicatedMap(this.getTaskAssignmentName());
+  }
+
+  private FencedLock getTaskAssignmentLock() {
+    return this.hzInstance.getCPSubsystem().getLock(this.getTaskAssignmentName() + "_assigner");
+  }
+
+  private String getTaskAssignmentName() {
+    return this.members.stream()
+        .map(m -> m.getUuid().toString())
+        .collect(Collectors.joining("/"));
+  }
+
+  private static void delay(final Duration duration) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(duration.toMillis());
+    } catch (final InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private static final class LoadDefPerMember {
+    public final WorkloadDefinition loadDefinition;
+    public final Member member;
+
+    public LoadDefPerMember(final WorkloadDefinition loadDefinition, final Member member) {
+      this.loadDefinition = loadDefinition;
+      this.member = member;
+    }
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java
similarity index 64%
rename from benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java
rename to benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java
index 2eaa1d487..5d5d55c2d 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KeySpace.java
@@ -1,5 +1,8 @@
-package theodolite.commons.workloadgeneration.dimensions;
+package theodolite.commons.workloadgeneration;
 
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator;
 
 /**
@@ -12,34 +15,24 @@ public class KeySpace {
   private final int min;
   private final int max;
 
-
   /**
    * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of
    * each key will be determined by a number of the interval ({@code min}, {@code max}-1).
    *
    * @param prefix the prefix to use for all keys
    * @param min the lower bound (inclusive) to start counting from
-   * @param max the upper bound (exclusive) to count to
+   * @param max the upper bound (inclusive) to count to
    */
   public KeySpace(final String prefix, final int min, final int max) {
-    if (prefix == null || prefix.contains(";")) {
-      throw new IllegalArgumentException(
-          "The prefix must not be null and must not contain the ';' character.");
-    }
     this.prefix = prefix;
     this.min = min;
     this.max = max;
-
   }
 
   public KeySpace(final String prefix, final int numberOfKeys) {
     this(prefix, 0, numberOfKeys - 1);
   }
 
-  public KeySpace(final int numberOfKeys) {
-    this("sensor_", 0, numberOfKeys - 1);
-  }
-
   public String getPrefix() {
     return this.prefix;
   }
@@ -53,4 +46,20 @@ public class KeySpace {
   public int getMax() {
     return this.max;
   }
+
+  public int getCount() {
+    return this.getMax() - this.getMin() + 1;
+  }
+
+  public Collection<String> getKeys() {
+    return IntStream.rangeClosed(this.min, this.max)
+        .mapToObj(id -> this.prefix + id)
+        .collect(Collectors.toUnmodifiableList());
+  }
+
+  @Override
+  public String toString() {
+    return this.prefix + '[' + this.min + '-' + this.max + ']';
+  }
+
 }
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
new file mode 100644
index 000000000..55aed822d
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java
@@ -0,0 +1,147 @@
+package theodolite.commons.workloadgeneration;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import theodolite.commons.workloadgeneration.functions.BeforeAction;
+import theodolite.commons.workloadgeneration.functions.TitanMessageGeneratorFactory;
+
+public final class LoadGenerator {
+
+  private static final String SENSOR_PREFIX_DEFAULT = "s_";
+  private static final int NUMBER_OF_KEYS_DEFAULT = 10;
+  private static final int PERIOD_MS_DEFAULT = 1000;
+  private static final int VALUE_DEFAULT = 10;
+  private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
+  private static final String KAFKA_TOPIC_DEFAULT = "input";
+  private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
+
+  private ClusterConfig clusterConfig;
+  private WorkloadDefinition loadDefinition;
+  private LoadGeneratorConfig generatorConfig;
+  private boolean isStarted;
+
+  private LoadGenerator() {}
+
+  // Add constructor for creating from environment variables
+
+  public LoadGenerator setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD
+    this.clusterConfig = clusterConfig;
+    return this;
+  }
+
+  public LoadGenerator setLoadDefinition(final WorkloadDefinition loadDefinition) { // NOPMD
+    this.loadDefinition = loadDefinition;
+    return this;
+  }
+
+  public LoadGenerator setGeneratorConfig(final LoadGeneratorConfig generatorConfig) { // NOPMD
+    this.generatorConfig = generatorConfig;
+    return this;
+  }
+
+  public LoadGenerator withBeforeAction(final BeforeAction beforeAction) {
+    this.generatorConfig.setBeforeAction(beforeAction);
+    return this;
+  }
+
+  public LoadGenerator withThreads(final int threads) {
+    this.generatorConfig.setThreads(threads);
+    return this;
+  }
+
+  public void run() {
+    Objects.requireNonNull(this.clusterConfig, "No cluster config set.");
+    Objects.requireNonNull(this.generatorConfig, "No generator config set.");
+    Objects.requireNonNull(this.loadDefinition, "No load definition set.");
+    if (this.isStarted) {
+      throw new IllegalStateException("Load generator can only be started once.");
+    }
+    this.isStarted = true;
+    final HazelcastRunner runner = new HazelcastRunner(
+        this.clusterConfig,
+        this.generatorConfig,
+        this.loadDefinition);
+    runner.runBlocking();
+  }
+
+  public static LoadGenerator fromDefaults() {
+    return new LoadGenerator()
+        .setClusterConfig(new ClusterConfig())
+        .setLoadDefinition(new WorkloadDefinition(
+            new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
+            Duration.ofMillis(PERIOD_MS_DEFAULT)))
+        .setGeneratorConfig(new LoadGeneratorConfig(
+            TitanMessageGeneratorFactory
+                .withKafkaConfig(
+                    KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
+                    KAFKA_TOPIC_DEFAULT,
+                    SCHEMA_REGISTRY_URL_DEFAULT)
+                .forConstantValue(VALUE_DEFAULT)));
+  }
+
+  public static LoadGenerator fromEnvironment() {
+    final String bootstrapServer = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER),
+        ClusterConfig.BOOTSTRAP_SERVER_DEFAULT);
+    final int port = Integer.parseInt(Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.PORT),
+        Integer.toString(ClusterConfig.PORT_DEFAULT)));
+    final boolean portAutoIncrement = Boolean.parseBoolean(Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT),
+        Boolean.toString(ClusterConfig.PORT_AUTO_INCREMENT_DEFAULT)));
+    final String clusterNamePrefix = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX),
+        ClusterConfig.CLUSTER_NAME_PREFIX_DEFAULT);
+    final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.NUM_SENSORS),
+        Integer.toString(NUMBER_OF_KEYS_DEFAULT)));
+    final int periodMs = Integer.parseInt(Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.PERIOD_MS),
+        Integer.toString(PERIOD_MS_DEFAULT)));
+    final double value = Double.parseDouble(Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.VALUE),
+        Integer.toString(VALUE_DEFAULT)));
+    final String kafkaBootstrapServers = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
+        KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
+    final String kafkaInputTopic = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
+        KAFKA_TOPIC_DEFAULT);
+    final String schemaRegistryUrl = Objects.requireNonNullElse(
+        System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
+        SCHEMA_REGISTRY_URL_DEFAULT);
+    final Properties kafkaProperties = new Properties();
+    kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG,
+        (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE));
+    kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG,
+        (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS));
+    kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG,
+        (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY));
+
+    return new LoadGenerator()
+        .setClusterConfig(new ClusterConfig(
+            bootstrapServer,
+            port,
+            portAutoIncrement,
+            clusterNamePrefix))
+        .setLoadDefinition(new WorkloadDefinition(
+            new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
+            Duration.ofMillis(periodMs)))
+        .setGeneratorConfig(new LoadGeneratorConfig(
+            TitanMessageGeneratorFactory
+                .withKafkaConfig(
+                    kafkaBootstrapServers,
+                    kafkaInputTopic,
+                    schemaRegistryUrl,
+                    kafkaProperties)
+                .forConstantValue(value)));
+  }
+
+  public static void main(final String[] args) {
+    LoadGenerator.fromEnvironment()
+        .run();
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
new file mode 100644
index 000000000..3d76b1c85
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java
@@ -0,0 +1,48 @@
+package theodolite.commons.workloadgeneration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.functions.BeforeAction;
+import theodolite.commons.workloadgeneration.functions.MessageGenerator;
+
+public class LoadGeneratorConfig {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorConfig.class);
+
+  private final MessageGenerator messageGenerator;
+  private BeforeAction beforeAction = BeforeAction.doNothing();
+  private int threads = 1;
+
+  public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
+    this.messageGenerator = messageGenerator;
+  }
+
+  public LoadGeneratorConfig(
+      final MessageGenerator messageGenerator,
+      final BeforeAction beforeAction,
+      final int threads) {
+    this.messageGenerator = messageGenerator;
+    this.beforeAction = beforeAction;
+    this.threads = threads;
+  }
+
+  public LoadGeneratorExecution buildLoadGeneratorExecution(
+      final WorkloadDefinition workloadDefinition) {
+    return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
+  }
+
+  public BeforeAction getBeforeAction() {
+    return this.beforeAction;
+  }
+
+  public void setThreads(final int threads) {
+    this.threads = threads;
+  }
+
+  public void setBeforeAction(final BeforeAction beforeAction) {
+    this.beforeAction = beforeAction;
+  }
+
+
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
new file mode 100644
index 000000000..e666ef9b4
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java
@@ -0,0 +1,46 @@
+package theodolite.commons.workloadgeneration;
+
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.functions.MessageGenerator;
+
+public class LoadGeneratorExecution {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
+
+  private final Random random = new Random();
+  private final WorkloadDefinition workloadDefinition;
+  private final MessageGenerator messageGenerator;
+  private final ScheduledExecutorService executor;
+
+  public LoadGeneratorExecution(
+      final WorkloadDefinition workloadDefinition,
+      final MessageGenerator messageGenerator,
+      final int threads) {
+    this.workloadDefinition = workloadDefinition;
+    this.messageGenerator = messageGenerator;
+    this.executor = Executors.newScheduledThreadPool(threads);
+  }
+
+  public void start() {
+    LOGGER.info("Beginning of Experiment...");
+    LOGGER.info("Generating records for {} keys.",
+        this.workloadDefinition.getKeySpace().getCount());
+    LOGGER.info("Experiment is going to be executed until cancelation...");
+
+    final int periodMs = (int) this.workloadDefinition.getPeriod().toMillis();
+    for (final String key : this.workloadDefinition.getKeySpace().getKeys()) {
+      final long initialDelay = this.random.nextInt(periodMs);
+      final Runnable task = () -> this.messageGenerator.generate(key);
+      this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void stop() {
+    this.executor.shutdownNow();
+  }
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java
new file mode 100644
index 000000000..0bc1d81d2
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/WorkloadDefinition.java
@@ -0,0 +1,51 @@
+package theodolite.commons.workloadgeneration;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class WorkloadDefinition {
+
+  private final KeySpace keySpace;
+  private final Duration period;
+
+  /**
+   * Create a new workload definition.
+   *
+   * @param keySpace the key space to use.
+   */
+  public WorkloadDefinition(
+      final KeySpace keySpace,
+      final Duration period) {
+    this.keySpace = keySpace;
+    this.period = period;
+  }
+
+  public KeySpace getKeySpace() {
+    return this.keySpace;
+  }
+
+  public Duration getPeriod() {
+    return this.period;
+  }
+
+  public Set<WorkloadDefinition> divide(final int parts) {
+    final int size = (this.keySpace.getCount() + parts - 1) / parts; // = ceil(count/parts)
+    return IntStream.range(0, parts)
+        .mapToObj(part -> new KeySpace(
+            this.keySpace.getPrefix(),
+            this.keySpace.getMin() + part * size,
+            Math.min(this.keySpace.getMin() + (part + 1) * size - 1, this.keySpace.getMax())))
+        .map(keySpace -> new WorkloadDefinition(
+            keySpace,
+            this.period))
+        .collect(Collectors.toUnmodifiableSet());
+  }
+
+  @Override
+  public String toString() {
+    return this.keySpace + ";" + this.period.toMillis();
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
index 33818b510..35ce089c5 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
@@ -9,7 +9,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import theodolite.commons.workloadgeneration.functions.Transport;
+import theodolite.commons.workloadgeneration.functions.RecordSender;
 import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 
 /**
@@ -17,7 +17,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  *
  * @param <T> {@link IMonitoringRecord} to send
  */
-public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
+public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
 
@@ -47,10 +47,19 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
 
     final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
         new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
+    this.producer = new KafkaProducer<>(
+        properties,
+        new StringSerializer(),
         avroSerdeFactory.<T>forKeys().serializer());
   }
 
+  public static <T extends SpecificRecord> Builder<T> builder(
+      final String bootstrapServers,
+      final String topic,
+      final String schemaRegistryUrl) {
+    return new Builder<>(bootstrapServers, topic, schemaRegistryUrl);
+  }
+
   /**
    * Builder class to build a new {@link KafkaRecordSender}.
    *
@@ -72,7 +81,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
      * @param topic The topic where to write.
      * @param schemaRegistryUrl URL to the schema registry for avro.
      */
-    public Builder(final String bootstrapServers, final String topic,
+    private Builder(final String bootstrapServers, final String topic,
         final String schemaRegistryUrl) {
       this.bootstrapServers = bootstrapServers;
       this.topic = topic;
@@ -116,7 +125,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
   }
 
   @Override
-  public void transport(final T message) {
+  public void send(final T message) {
     this.write(message);
   }
 
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java
index 7914a4985..0010bb5cc 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java
@@ -8,4 +8,9 @@ public interface BeforeAction {
 
   public void run();
 
+  public static BeforeAction doNothing() {
+    return () -> {
+    };
+  }
+
 }
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java
index 672b579eb..11034c2d5 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java
@@ -1,14 +1,14 @@
 package theodolite.commons.workloadgeneration.functions;
 
-/**
- * This interface describes a function that takes meta information from a string (e.g. an ID) and
- * produces an object of type T.
- *
- * @param <T> the type of the objects that will be generated by the function.
- */
 @FunctionalInterface
-public interface MessageGenerator<T> {
+public interface MessageGenerator {
 
-  T generateMessage(final String key);
+  void generate(final String key);
+
+  public static <T> MessageGenerator from(
+      final RecordGenerator<T> generator,
+      final RecordSender<T> sender) {
+    return key -> sender.send(generator.generate(key));
+  }
 
 }
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordGenerator.java
new file mode 100644
index 000000000..1075d089f
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordGenerator.java
@@ -0,0 +1,14 @@
+package theodolite.commons.workloadgeneration.functions;
+
+/**
+ * This interface describes a function that takes meta information from a string (e.g. an ID) and
+ * produces an object of type T.
+ *
+ * @param <T> the type of the objects that will be generated by the function.
+ */
+@FunctionalInterface
+public interface RecordGenerator<T> {
+
+  T generate(final String key);
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordSender.java
similarity index 82%
rename from benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java
rename to benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordSender.java
index 7e5100a4e..6f912779d 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/RecordSender.java
@@ -7,8 +7,8 @@ package theodolite.commons.workloadgeneration.functions;
  * @param <T> the type of records to send as messages.
  */
 @FunctionalInterface
-public interface Transport<T> {
+public interface RecordSender<T> {
 
-  void transport(final T message);
+  void send(final T message);
 
 }
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/TitanMessageGeneratorFactory.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/TitanMessageGeneratorFactory.java
new file mode 100644
index 000000000..3fa9dba7c
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/TitanMessageGeneratorFactory.java
@@ -0,0 +1,44 @@
+package theodolite.commons.workloadgeneration.functions;
+
+import java.util.Properties;
+import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public final class TitanMessageGeneratorFactory {
+
+  private final RecordSender<ActivePowerRecord> recordSender;
+
+  private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) {
+    this.recordSender = recordSender;
+  }
+
+  public MessageGenerator forConstantValue(final double value) {
+    return MessageGenerator.from(
+        sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value),
+        this.recordSender);
+  }
+
+  public static TitanMessageGeneratorFactory withKafkaConfig(
+      final String bootstrapServers,
+      final String topic,
+      final String schemaRegistryUrl) {
+    return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties());
+  }
+
+  public static TitanMessageGeneratorFactory withKafkaConfig(
+      final String bootstrapServers,
+      final String topic,
+      final String schemaRegistryUrl,
+      final Properties properties) {
+    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender
+        .<ActivePowerRecord>builder(
+            bootstrapServers,
+            topic,
+            schemaRegistryUrl)
+        .keyAccessor(r -> r.getIdentifier())
+        .timestampAccessor(r -> r.getTimestamp())
+        .build();
+    return new TitanMessageGeneratorFactory(kafkaRecordSender);
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
index 104f1cefb..68376d099 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
@@ -11,11 +11,11 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.functions.BeforeAction;
-import theodolite.commons.workloadgeneration.functions.MessageGenerator;
-import theodolite.commons.workloadgeneration.functions.Transport;
+import theodolite.commons.workloadgeneration.functions.RecordGenerator;
+import theodolite.commons.workloadgeneration.functions.RecordSender;
 import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
 import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
@@ -35,8 +35,8 @@ public abstract class AbstractWorkloadGenerator<T>
   private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable
   private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable
   private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
-  private final MessageGenerator<T> generatorFunction;
-  private final Transport<T> transport;
+  private final RecordGenerator<T> generatorFunction;
+  private final RecordSender<T> transport;
   private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local
   private final ScheduledExecutorService executor;
 
@@ -61,8 +61,8 @@ public abstract class AbstractWorkloadGenerator<T>
       final Duration period,
       final Duration duration,
       final BeforeAction beforeAction,
-      final MessageGenerator<T> generatorFunction,
-      final Transport<T> transport) {
+      final RecordGenerator<T> generatorFunction,
+      final RecordSender<T> transport) {
     this.instances = instances;
     this.zooKeeper = zooKeeper;
     this.keySpace = keySpace;
@@ -99,7 +99,7 @@ public abstract class AbstractWorkloadGenerator<T>
 
       entities.forEach(entity -> {
         final long initialDelay = random.nextInt(periodMs);
-        final Runnable task = () -> this.transport.transport(entity.generateMessage());
+        final Runnable task = () -> this.transport.send(entity.generateMessage());
         this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
       });
 
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
index 944cec6a2..2186454af 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
@@ -2,10 +2,10 @@ package theodolite.commons.workloadgeneration.generators;
 
 import java.time.Duration;
 import org.apache.avro.specific.SpecificRecord;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.functions.BeforeAction;
-import theodolite.commons.workloadgeneration.functions.MessageGenerator;
+import theodolite.commons.workloadgeneration.functions.RecordGenerator;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
 /**
@@ -41,7 +41,7 @@ public class KafkaWorkloadGenerator<T extends SpecificRecord>
       final Duration period,
       final Duration duration,
       final BeforeAction beforeAction,
-      final MessageGenerator<T> generatorFunction,
+      final RecordGenerator<T> generatorFunction,
       final KafkaRecordSender<T> recordSender) {
     super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction,
         generatorFunction,
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
index 785087c13..76f0d024a 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
@@ -3,10 +3,10 @@ package theodolite.commons.workloadgeneration.generators;
 import java.time.Duration;
 import java.util.Objects;
 import org.apache.avro.specific.SpecificRecord;
+import theodolite.commons.workloadgeneration.KeySpace;
 import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.functions.BeforeAction;
-import theodolite.commons.workloadgeneration.functions.MessageGenerator;
+import theodolite.commons.workloadgeneration.functions.RecordGenerator;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
 /**
@@ -23,7 +23,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { //
   private Duration period; // NOPMD
   private Duration duration; // NOPMD
   private BeforeAction beforeAction; // NOPMD
-  private MessageGenerator<T> generatorFunction; // NOPMD
+  private RecordGenerator<T> generatorFunction; // NOPMD
   private KafkaRecordSender<T> kafkaRecordSender; // NOPMD
 
   private KafkaWorkloadGeneratorBuilder() {
@@ -123,7 +123,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { //
    * @return the builder.
    */
   public KafkaWorkloadGeneratorBuilder<T> generatorFunction(
-      final MessageGenerator<T> generatorFunction) {
+      final RecordGenerator<T> generatorFunction) {
     this.generatorFunction = generatorFunction;
     return this;
   }
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
index 86369d6c8..61c6ac75e 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
@@ -1,6 +1,6 @@
 package theodolite.commons.workloadgeneration.misc;
 
-import theodolite.commons.workloadgeneration.dimensions.KeySpace;
+import theodolite.commons.workloadgeneration.KeySpace;
 
 /**
  * The central class that contains all information that needs to be exchanged between the nodes for
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
index d8665b3fb..3f044d40b 100644
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
+++ b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
@@ -1,6 +1,6 @@
 package theodolite.commons.workloadgeneration.misc;
 
-import theodolite.commons.workloadgeneration.functions.MessageGenerator;
+import theodolite.commons.workloadgeneration.functions.RecordGenerator;
 
 /**
  * Representation of a entity of the workload generation that generates load for one fixed key.
@@ -9,14 +9,14 @@ import theodolite.commons.workloadgeneration.functions.MessageGenerator;
  */
 public class WorkloadEntity<T> {
   private final String key;
-  private final MessageGenerator<T> generator;
+  private final RecordGenerator<T> generator;
 
-  public WorkloadEntity(final String key, final MessageGenerator<T> generator) {
+  public WorkloadEntity(final String key, final RecordGenerator<T> generator) {
     this.key = key;
     this.generator = generator;
   }
 
   public T generateMessage() {
-    return this.generator.generateMessage(this.key);
+    return this.generator.generate(this.key);
   }
 }
diff --git a/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/dimensions/KeySpaceTest.java b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/dimensions/KeySpaceTest.java
new file mode 100644
index 000000000..bbd8c297d
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/dimensions/KeySpaceTest.java
@@ -0,0 +1,30 @@
+package theodolite.commons.workloadgeneration.dimensions;
+
+import org.junit.Assert;
+import org.junit.Test;
+import theodolite.commons.workloadgeneration.KeySpace;
+
+public class KeySpaceTest {
+
+  @Test
+  public void testCountFixedRangeFromZero() {
+    final KeySpace keySpace = new KeySpace("prefix", 0, 9);
+    final int count = keySpace.getCount();
+    Assert.assertEquals(10, count);
+  }
+
+  @Test
+  public void testCountFixedRangeNotFromZero() {
+    final KeySpace keySpace = new KeySpace("prefix", 4, 11);
+    final int count = keySpace.getCount();
+    Assert.assertEquals(8, count);
+  }
+
+  @Test
+  public void testCountAutoRange() {
+    final KeySpace keySpace = new KeySpace("prefix", 42);
+    final int count = keySpace.getCount();
+    Assert.assertEquals(42, count);
+  }
+
+}
diff --git a/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition2Test.java b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition2Test.java
new file mode 100644
index 000000000..6dd2232be
--- /dev/null
+++ b/benchmarks/workload-generator-commons/src/test/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition2Test.java
@@ -0,0 +1,63 @@
+package theodolite.commons.workloadgeneration.misc;
+
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.junit.Assert;
+import org.junit.Test;
+import theodolite.commons.workloadgeneration.KeySpace;
+import theodolite.commons.workloadgeneration.WorkloadDefinition;
+
+public class WorkloadDefinition2Test {
+
+  @Test
+  public void testDivideByOneAmount() {
+    final KeySpace keySpace = new KeySpace("prefix", 100);
+    final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1));
+    final Set<WorkloadDefinition> subworkloads = workload.divide(1);
+    Assert.assertEquals(1, subworkloads.size());
+  }
+
+  @Test
+  public void testDivideMultipleAmount() {
+    final KeySpace keySpace = new KeySpace("prefix", 100);
+    final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1));
+    final Set<WorkloadDefinition> subworkloads = workload.divide(2);
+    Assert.assertEquals(2, subworkloads.size());
+  }
+
+  @Test
+  public void testDivideNonMultipleAmount() {
+    final KeySpace keySpace = new KeySpace("prefix", 100);
+    final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1));
+    final Set<WorkloadDefinition> subworkloads = workload.divide(3);
+    Assert.assertEquals(3, subworkloads.size());
+  }
+
+  @Test
+  public void testDivide() {
+    final KeySpace keySpace = new KeySpace("prefix", 100);
+    final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1));
+    final Set<WorkloadDefinition> subworkloads = workload.divide(3);
+    Assert.assertEquals(3, subworkloads.size());
+    for (final WorkloadDefinition subworkload : subworkloads) {
+      Assert.assertEquals("prefix", subworkload.getKeySpace().getPrefix());
+      Assert.assertEquals(Duration.ofSeconds(1), subworkload.getPeriod());
+    }
+    final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream()
+        .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin()))
+        .collect(Collectors.toList());
+    final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0);
+    Assert.assertEquals(0, subworkload1.getKeySpace().getMin());
+    Assert.assertEquals(33, subworkload1.getKeySpace().getMax());
+    final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1);
+    Assert.assertEquals(34, subworkload2.getKeySpace().getMin());
+    Assert.assertEquals(67, subworkload2.getKeySpace().getMax());
+    final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2);
+    Assert.assertEquals(68, subworkload3.getKeySpace().getMin());
+    Assert.assertEquals(99, subworkload3.getKeySpace().getMax());
+  }
+
+}
-- 
GitLab