From 6c1ceecabef2095273722031df9f864cbadf691b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Sat, 5 Mar 2022 17:31:06 +0100
Subject: [PATCH] Improve options to kill load generator, fix #324

---
 .../EnvVarLoadGeneratorFactory.java           |  4 +--
 .../loadgenerator/GeneratorAction.java        |  8 ++++--
 .../loadgenerator/GeneratorActionImpl.java    | 27 +++++++++++++++++++
 .../loadgenerator/HazelcastRunner.java        | 15 +++++++----
 .../HazelcastRunnerStateInstance.java         |  5 +++-
 .../loadgenerator/KafkaRecordSender.java      |  3 ++-
 .../loadgenerator/KafkaRecordSenderImpl.java  |  2 +-
 .../loadgenerator/LoadGenerator.java          | 12 +++++----
 .../loadgenerator/LoadGeneratorConfig.java    | 17 +++++-------
 .../loadgenerator/PubSubRecordSender.java     |  3 ++-
 .../loadgenerator/RecordGenerator.java        |  9 ++++++-
 .../loadgenerator/RecordSender.java           |  9 ++++++-
 12 files changed, 83 insertions(+), 31 deletions(-)
 create mode 100644 theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java

diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
index 3eb63f161..29ede821e 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java
@@ -42,9 +42,9 @@ class EnvVarLoadGeneratorFactory {
         .setLoadDefinition(new WorkloadDefinition(
             new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors),
             Duration.ofMillis(periodMs)))
-        .setGeneratorConfig(new LoadGeneratorConfig(
+        .setGeneratorConfig(new LoadGeneratorConfig(GeneratorAction.from(
             TitanRecordGenerator.forConstantValue(value),
-            this.buildRecordSender()))
+            this.buildRecordSender())))
         .withThreads(threads);
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
index e8a03fc87..fb3bf1c9f 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java
@@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * it.
  */
 @FunctionalInterface
-interface GeneratorAction {
+public interface GeneratorAction {
 
   void generate(final String key);
 
+  default void shutdown() {
+    // Nothing to do per default
+  }
+
   public static <T> GeneratorAction from(
       final RecordGenerator<? extends T> generator,
       final RecordSender<? super T> sender) {
-    return key -> sender.send(generator.generate(key));
+    return new GeneratorActionImpl<>(generator, sender);
   }
 
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java
new file mode 100644
index 000000000..1d177b719
--- /dev/null
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java
@@ -0,0 +1,27 @@
+package rocks.theodolite.benchmarks.loadgenerator;
+
+class GeneratorActionImpl<T> implements GeneratorAction {
+
+  private final RecordGenerator<? extends T> generator;
+
+  private final RecordSender<? super T> sender;
+
+  public GeneratorActionImpl(
+      final RecordGenerator<? extends T> generator,
+      final RecordSender<? super T> sender) {
+    this.generator = generator;
+    this.sender = sender;
+  }
+
+  @Override
+  public void shutdown() {
+    this.generator.close();
+    this.sender.close();
+  }
+
+  @Override
+  public void generate(final String key) {
+    this.sender.send(this.generator.generate(key));
+  }
+
+}
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
index f3895f5d4..3b222a332 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java
@@ -36,7 +36,7 @@ public class HazelcastRunner {
   }
 
   /**
-   * Start the workload generation and blocks until the workload generation is stopped again.
+   * Start the load generation and blocks until the load generation is stopped again.
    */
   public void runBlocking() {
     while (!this.stopAction.isDone()) {
@@ -52,19 +52,24 @@ public class HazelcastRunner {
   }
 
   public void restart() {
-    this.stopRunnerState();
+    this.stopRunnerStateAsync();
   }
 
+  /**
+   * Stop generating load and clean up the entire state.
+   */
   public void stop() {
     this.stopAction.complete(null);
-    this.stopRunnerState();
+    this.stopRunnerStateAsync().join();
+    this.hzInstance.shutdown();
   }
 
-  private void stopRunnerState() {
+  private CompletableFuture<Void> stopRunnerStateAsync() {
     synchronized (this) {
       if (this.runnerState != null) {
-        this.runnerState.stopAsync();
+        return this.runnerState.stopAsync();
       }
+      return CompletableFuture.completedFuture(null);
     }
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
index 79b86b6ee..81a6db732 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java
@@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance {
   private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500);
 
   private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
+  private final CompletableFuture<Void> stopFinished = new CompletableFuture<>();
   private LoadGeneratorExecution loadGeneratorExecution;
 
   private final LoadGeneratorConfig loadGeneratorConfig;
@@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance {
     }
     this.stopAction.join();
     this.stopLoadGeneration();
+    this.stopFinished.complete(null);
   }
 
-  public void stopAsync() {
+  public CompletableFuture<Void> stopAsync() {
     this.stopAction.complete(null);
+    return this.stopFinished;
   }
 
   private void tryPerformBeforeAction() {
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
index 3f3b05c9b..56b1946ad 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java
@@ -17,7 +17,8 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
  */
 public interface KafkaRecordSender<T> extends RecordSender<T> {
 
-  public void terminate();
+  @Override
+  public void close();
 
   /**
    * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}.
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
index 289651b41..02a4d206b 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
   }
 
   @Override
-  public void terminate() {
+  public void close() {
     this.producer.close();
   }
 
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
index d7ddf36e8..27edb97ef 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java
@@ -79,6 +79,7 @@ public final class LoadGenerator {
         this.clusterConfig,
         this.generatorConfig,
         this.loadDefinition);
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> runner.stop()));
     runner.runBlocking();
   }
 
@@ -92,11 +93,12 @@ public final class LoadGenerator {
             new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
             Duration.ofMillis(PERIOD_MS_DEFAULT)))
         .setGeneratorConfig(new LoadGeneratorConfig(
-            TitanRecordGenerator.forConstantValue(VALUE_DEFAULT),
-            TitanKafkaSenderFactory.forKafkaConfig(
-                KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
-                KAFKA_TOPIC_DEFAULT,
-                SCHEMA_REGISTRY_URL_DEFAULT)));
+            GeneratorAction.from(
+                TitanRecordGenerator.forConstantValue(VALUE_DEFAULT),
+                TitanKafkaSenderFactory.forKafkaConfig(
+                    KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
+                    KAFKA_TOPIC_DEFAULT,
+                    SCHEMA_REGISTRY_URL_DEFAULT))));
   }
 
   /**
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
index 97ed0b8fc..e854138b3 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java
@@ -5,21 +5,16 @@ package rocks.theodolite.benchmarks.loadgenerator;
  */
 public class LoadGeneratorConfig {
 
-  private final GeneratorAction messageGenerator;
+  private final GeneratorAction generatorAction;
   private BeforeAction beforeAction = BeforeAction.doNothing();
   private int threads = 1;
 
-  public <T> LoadGeneratorConfig(
-      final RecordGenerator<? extends T> generator,
-      final RecordSender<? super T> sender) {
-    this.messageGenerator = GeneratorAction.from(generator, sender);
+  public LoadGeneratorConfig(final GeneratorAction generatorAction) {
+    this.generatorAction = generatorAction;
   }
 
-  public <T> LoadGeneratorConfig(
-      final RecordGenerator<? extends T> generator,
-      final RecordSender<? super T> sender,
-      final int threads) {
-    this(generator, sender);
+  public LoadGeneratorConfig(final GeneratorAction generatorAction, final int threads) {
+    this(generatorAction);
     this.threads = threads;
   }
 
@@ -37,7 +32,7 @@ public class LoadGeneratorConfig {
 
   public LoadGeneratorExecution buildLoadGeneratorExecution(
       final WorkloadDefinition workloadDefinition) {
-    return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
+    return new LoadGeneratorExecution(workloadDefinition, this.generatorAction, this.threads);
   }
 
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
index 97c4533dc..ecba69612 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java
@@ -55,7 +55,8 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
   /**
    * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}.
    */
-  public void terminate() {
+  @Override
+  public void close() {
     this.publisher.shutdown();
     try {
       this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
index 0b64ace46..05e127eb0 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java
@@ -1,5 +1,7 @@
 package rocks.theodolite.benchmarks.loadgenerator;
 
+import java.io.Closeable;
+
 /**
  * This interface describes a function that takes meta information from a string key and produces an
  * object of type T.
@@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * @param <T> the type of the objects that will be generated by the function.
  */
 @FunctionalInterface
-public interface RecordGenerator<T> {
+public interface RecordGenerator<T> extends Closeable {
 
   T generate(final String key);
 
+  @Override
+  default void close() {
+    // Nothing to do per default
+  }
+
 }
diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
index 71732b88d..f1f1bef98 100644
--- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
+++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java
@@ -1,5 +1,7 @@
 package rocks.theodolite.benchmarks.loadgenerator;
 
+import java.io.Closeable;
+
 /**
  * This interface describes a function that consumes a message {@code T}. This function is dedicated
  * to be used to transport individual messages to the messaging system.
@@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator;
  * @param <T> the type of records to send as messages.
  */
 @FunctionalInterface
-public interface RecordSender<T> {
+public interface RecordSender<T> extends Closeable {
 
   void send(final T message);
 
+  @Override
+  default void close() {
+    // Nothing to do per default
+  }
+
 }
-- 
GitLab