diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
index 3eb63f161d7faed645d365bd0b4d107d4df54383..29ede821eefe171f377d58fce8d98eee28e8a277 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
@@ -42,9 +42,9 @@ class EnvVarLoadGeneratorFactory {
         .setLoadDefinition(new WorkloadDefinition(
             new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors),
             Duration.ofMillis(periodMs)))
-        .setGeneratorConfig(new LoadGeneratorConfig(
+        .setGeneratorConfig(new LoadGeneratorConfig(GeneratorAction.from(
             TitanRecordGenerator.forConstantValue(value),
-            this.buildRecordSender()))
+            this.buildRecordSender())))
         .withThreads(threads);
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
index e8a03fc8776d0cb07fc9623df93688db7455a042..fb3bf1c9f802d2af2a0eae72bc58c9e609a6b624 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
@@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * it.
  */
 @FunctionalInterface
-interface GeneratorAction {
+public interface GeneratorAction {
 
   void generate(final String key);
 
+  default void shutdown() {
+    // Nothing to do per default
+  }
+
   public static <T> GeneratorAction from(
       final RecordGenerator<? extends T> generator,
       final RecordSender<? super T> sender) {
-    return key -> sender.send(generator.generate(key));
+    return new GeneratorActionImpl<>(generator, sender);
   }
 
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..1d177b7193ee8df8b4d65546cbbcbb6f49a95488
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java
@@ -0,0 +1,27 @@
+package rocks.theodolite.benchmarks.loadgenerator;
+
+class GeneratorActionImpl<T> implements GeneratorAction {
+
+  private final RecordGenerator<? extends T> generator;
+
+  private final RecordSender<? super T> sender;
+
+  public GeneratorActionImpl(
+      final RecordGenerator<? extends T> generator,
+      final RecordSender<? super T> sender) {
+    this.generator = generator;
+    this.sender = sender;
+  }
+
+  @Override
+  public void shutdown() {
+    this.generator.close();
+    this.sender.close();
+  }
+
+  @Override
+  public void generate(final String key) {
+    this.sender.send(this.generator.generate(key));
+  }
+
+}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
index f3895f5d46dedcfa38d5fa9f2c43fb12a1aee672..3b222a332fc76159aedcf82a1753363c7b1e414e 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
@@ -36,7 +36,7 @@ public class HazelcastRunner {
   }
 
   /**
-   * Start the workload generation and blocks until the workload generation is stopped again.
+   * Start the load generation and blocks until the load generation is stopped again.
    */
   public void runBlocking() {
     while (!this.stopAction.isDone()) {
@@ -52,19 +52,24 @@ public class HazelcastRunner {
   }
 
   public void restart() {
-    this.stopRunnerState();
+    this.stopRunnerStateAsync();
   }
 
+  /**
+   * Stop generating load and clean up the entire state.
+   */
   public void stop() {
     this.stopAction.complete(null);
-    this.stopRunnerState();
+    this.stopRunnerStateAsync().join();
+    this.hzInstance.shutdown();
   }
 
-  private void stopRunnerState() {
+  private CompletableFuture<Void> stopRunnerStateAsync() {
     synchronized (this) {
       if (this.runnerState != null) {
-        this.runnerState.stopAsync();
+        return this.runnerState.stopAsync();
       }
+      return CompletableFuture.completedFuture(null);
     }
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
index 79b86b6eecd980b8d90acab682687b6f8804d1b6..81a6db7329309a8c2b09fb3308cb496c5420a206 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
@@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance {
   private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500);
 
   private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
+  private final CompletableFuture<Void> stopFinished = new CompletableFuture<>();
   private LoadGeneratorExecution loadGeneratorExecution;
 
   private final LoadGeneratorConfig loadGeneratorConfig;
@@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance {
     }
     this.stopAction.join();
     this.stopLoadGeneration();
+    this.stopFinished.complete(null);
   }
 
-  public void stopAsync() {
+  public CompletableFuture<Void> stopAsync() {
     this.stopAction.complete(null);
+    return this.stopFinished;
   }
 
   private void tryPerformBeforeAction() {
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
index 3f3b05c9bbf4c361f25f856a711265dc267c0365..56b1946ad78d888fe6e5140fdc373bb2cd3a4ed4 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
@@ -17,7 +17,8 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  */
 public interface KafkaRecordSender<T> extends RecordSender<T> {
 
-  public void terminate();
+  @Override
+  public void close();
 
   /**
    * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}.
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
index 289651b417aa715f8f528bb6547121bcfcc3155d..02a4d206b0a2414d5f12f5348f6c1bfc56852281 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
   }
 
   @Override
-  public void terminate() {
+  public void close() {
     this.producer.close();
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
index d7ddf36e88297e93bcecb1de8b8145097aa96ca7..27edb97efc335400acf1d6244db0ce384ee20f59 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
@@ -79,6 +79,7 @@ public final class LoadGenerator {
         this.clusterConfig,
         this.generatorConfig,
         this.loadDefinition);
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> runner.stop()));
     runner.runBlocking();
   }
 
@@ -92,11 +93,12 @@ public final class LoadGenerator {
             new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
             Duration.ofMillis(PERIOD_MS_DEFAULT)))
         .setGeneratorConfig(new LoadGeneratorConfig(
-            TitanRecordGenerator.forConstantValue(VALUE_DEFAULT),
-            TitanKafkaSenderFactory.forKafkaConfig(
-                KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
-                KAFKA_TOPIC_DEFAULT,
-                SCHEMA_REGISTRY_URL_DEFAULT)));
+            GeneratorAction.from(
+                TitanRecordGenerator.forConstantValue(VALUE_DEFAULT),
+                TitanKafkaSenderFactory.forKafkaConfig(
+                    KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
+                    KAFKA_TOPIC_DEFAULT,
+                    SCHEMA_REGISTRY_URL_DEFAULT))));
   }
 
   /**
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
index 97ed0b8fce6a18050e2c5846da1c590e891ed80b..e854138b38613ba614c871febcb80cf9c6b059ef 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
@@ -5,21 +5,16 @@ package rocks.theodolite.benchmarks.loadgenerator;
  */
 public class LoadGeneratorConfig {
 
-  private final GeneratorAction messageGenerator;
+  private final GeneratorAction generatorAction;
   private BeforeAction beforeAction = BeforeAction.doNothing();
   private int threads = 1;
 
-  public <T> LoadGeneratorConfig(
-      final RecordGenerator<? extends T> generator,
-      final RecordSender<? super T> sender) {
-    this.messageGenerator = GeneratorAction.from(generator, sender);
+  public LoadGeneratorConfig(final GeneratorAction generatorAction) {
+    this.generatorAction = generatorAction;
   }
 
-  public <T> LoadGeneratorConfig(
-      final RecordGenerator<? extends T> generator,
-      final RecordSender<? super T> sender,
-      final int threads) {
-    this(generator, sender);
+  public LoadGeneratorConfig(final GeneratorAction generatorAction, final int threads) {
+    this(generatorAction);
     this.threads = threads;
   }
 
@@ -37,7 +32,7 @@ public class LoadGeneratorConfig {
 
   public LoadGeneratorExecution buildLoadGeneratorExecution(
       final WorkloadDefinition workloadDefinition) {
-    return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
+    return new LoadGeneratorExecution(workloadDefinition, this.generatorAction, this.threads);
   }
 
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
index 97c4533dc4b8904f8ae9a5c46c3459216e86b5ca..ecba6961245651c7420d89c5da9bd1f993972188 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
@@ -55,7 +55,8 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
   /**
    * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}.
    */
-  public void terminate() {
+  @Override
+  public void close() {
     this.publisher.shutdown();
     try {
       this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
index 0b64ace46a9e04f013f843ecd08dd6fcdf5eed9d..05e127eb019cf877cc5df73e09a6f053ef793fc3 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
@@ -1,5 +1,7 @@
 package rocks.theodolite.benchmarks.loadgenerator;
 
+import java.io.Closeable;
+
 /**
  * This interface describes a function that takes meta information from a string key and produces an
  * object of type T.
@@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * @param <T> the type of the objects that will be generated by the function.
  */
 @FunctionalInterface
-public interface RecordGenerator<T> {
+public interface RecordGenerator<T> extends Closeable {
 
   T generate(final String key);
 
+  @Override
+  default void close() {
+    // Nothing to do per default
+  }
+
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
index 71732b88d2cf3f119140474c387f78b92a9521f8..f1f1bef980f01da4a23b49440be71ba552c13905 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
@@ -1,5 +1,7 @@
 package rocks.theodolite.benchmarks.loadgenerator;
 
+import java.io.Closeable;
+
 /**
  * This interface describes a function that consumes a message {@code T}. This function is dedicated
  * to be used to transport individual messages to the messaging system.
@@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * @param <T> the type of records to send as messages.
  */
 @FunctionalInterface
-public interface RecordSender<T> {
+public interface RecordSender<T> extends Closeable {
 
   void send(final T message);
 
+  @Override
+  default void close() {
+    // Nothing to do per default
+  }
+
 }