From 27d79084b3075cc39d313c3640fc4a5e2d8772ba Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Thu, 25 Feb 2021 10:45:08 +0100
Subject: [PATCH] Remove old classes for load generation with ZooKeeper

---
 .../zookeeper/WorkloadDistributor.java        | 203 ------------------
 .../generators/AbstractWorkloadGenerator.java | 138 ------------
 .../generators/KafkaWorkloadGenerator.java    |  59 -----
 .../KafkaWorkloadGeneratorBuilder.java        | 185 ----------------
 .../generators/WorkloadGenerator.java         |  18 --
 .../misc/WorkloadDefinition.java              |  71 ------
 .../misc/WorkloadEntity.java                  |  22 --
 .../workloadgeneration/misc/ZooKeeper.java    |  30 ---
 8 files changed, 726 deletions(-)
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
 delete mode 100644 benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java

diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
deleted file mode 100644
index 095efa839..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
+++ /dev/null
@@ -1,203 +0,0 @@
-package theodolite.commons.workloadgeneration.communication.zookeeper;
-
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.recipes.atomic.AtomicValue;
-import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import theodolite.commons.workloadgeneration.BeforeAction;
-import theodolite.commons.workloadgeneration.KeySpace;
-import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
-import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-
-/**
- * The central class responsible for distributing the workload through all workload generators.
- */
-@Deprecated
-public class WorkloadDistributor {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class);
-
-  private static final String NAMESPACE = "workload-generation";
-  private static final String COUNTER_PATH = "/counter";
-  private static final String WORKLOAD_PATH = "/workload";
-  private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
-
-  // Curator retry strategy
-  private static final int BASE_SLEEP_TIME_MS = 2000;
-  private static final int MAX_RETRIES = 5;
-
-  // Wait time
-  private static final int MAX_WAIT_TIME = 20_000;
-
-  private final DistributedAtomicInteger counter;
-  private final KeySpace keySpace;
-  private final BeforeAction beforeAction;
-  private final BiConsumer<WorkloadDefinition, Integer> workerAction;
-
-  private final int instances;
-  private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
-  private final CuratorFramework client;
-
-  private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false
-
-  /**
-   * Create a new workload distributor.
-   *
-   * @param keySpace the keyspace for the workload generation.
-   * @param beforeAction the before action for the workload generation.
-   * @param workerAction the action to perform by the workers.
-   */
-  public WorkloadDistributor(
-      final int instances,
-      final ZooKeeper zooKeeper,
-      final KeySpace keySpace,
-      final BeforeAction beforeAction,
-      final BiConsumer<WorkloadDefinition, Integer> workerAction) {
-    this.instances = instances;
-    this.zooKeeper = zooKeeper;
-    this.keySpace = keySpace;
-    this.beforeAction = beforeAction;
-    this.workerAction = workerAction;
-
-    this.client = CuratorFrameworkFactory.builder()
-        .namespace(NAMESPACE)
-        .connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
-        .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
-        .build();
-
-    this.client.start();
-
-    try {
-      this.client.blockUntilConnected();
-    } catch (final InterruptedException e) {
-      LOGGER.error(e.getMessage(), e);
-      throw new IllegalStateException(e);
-    }
-
-    this.counter =
-        new DistributedAtomicInteger(this.client, COUNTER_PATH,
-            new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
-  }
-
-  /**
-   * Start the workload distribution.
-   */
-  public void start() {
-    try {
-      AtomicValue<Integer> result = this.counter.increment();
-      while (!result.succeeded()) {
-        result = this.counter.increment();
-      }
-
-      final int workerId = result.preValue();
-
-      final CuratorWatcher watcher = this.buildWatcher(workerId);
-
-      final Stat nodeExists =
-          this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
-      if (nodeExists == null) {
-        this.client.create().forPath(WORKLOAD_PATH);
-      }
-
-      if (workerId == 0) {
-        LOGGER.info("This instance is master with id {}", workerId);
-
-        this.beforeAction.run();
-
-        // register worker action, as master acts also as worker
-        this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
-
-        LOGGER.info("Number of Workers: {}", this.instances);
-
-        final WorkloadDefinition definition =
-            new WorkloadDefinition(this.keySpace, this.instances);
-
-        this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH,
-            definition.toString().getBytes(StandardCharsets.UTF_8));
-
-      } else {
-        LOGGER.info("This instance is worker with id {}", workerId);
-
-        this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
-
-        final Stat definitionExists =
-            this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
-
-        if (definitionExists != null) {
-          this.startWorkloadGeneration(workerId);
-        }
-      }
-
-      Thread.sleep(MAX_WAIT_TIME);
-
-      if (!this.workloadGenerationStarted) {
-        LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
-      }
-    } catch (final Exception e) { // NOPMD need to catch exception because of external framework
-      LOGGER.error(e.getMessage(), e);
-      throw new IllegalStateException("Error when starting the distribution of the workload.", e);
-    }
-  }
-
-  /**
-   * Start the workload generation. This methods body does only get executed once.
-   *
-   * @param workerId the ID of this worker
-   * @throws Exception when an error occurs
-   */
-  // NOPMD because exception thrown from used framework
-  private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD
-
-    if (!this.workloadGenerationStarted) {
-      this.workloadGenerationStarted = true;
-
-      final byte[] bytes =
-          this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
-      final WorkloadDefinition definition =
-          WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
-
-      this.workerAction.accept(definition, workerId);
-    }
-  }
-
-  /**
-   * Build a curator watcher which performs the worker action.
-   *
-   * @param worker the worker to create the watcher for.
-   * @return the curator watcher.
-   */
-  private CuratorWatcher buildWatcher(final int workerId) {
-    return new CuratorWatcher() {
-
-      @Override
-      public void process(final WatchedEvent event) {
-        if (event.getType() == EventType.NodeChildrenChanged) {
-          try {
-            WorkloadDistributor.this.startWorkloadGeneration(workerId);
-          } catch (final Exception e) { // NOPMD external framework throws exception
-            LOGGER.error(e.getMessage(), e);
-            throw new IllegalStateException("Error starting workload generation.", e);
-          }
-        }
-      }
-    };
-  }
-
-  /**
-   * Stop the workload distributor.
-   */
-  public void stop() {
-    this.client.close();
-  }
-
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
deleted file mode 100644
index 1e4fd6c25..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package theodolite.commons.workloadgeneration.generators;
-
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import theodolite.commons.workloadgeneration.BeforeAction;
-import theodolite.commons.workloadgeneration.KeySpace;
-import theodolite.commons.workloadgeneration.RecordGenerator;
-import theodolite.commons.workloadgeneration.RecordSender;
-import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
-import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
-import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
-import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-
-/**
- * Base for workload generators.
- *
- * @param <T> The type of records the workload generator is dedicated for.
- */
-public abstract class AbstractWorkloadGenerator<T>
-    implements WorkloadGenerator {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
-
-  private final int instances; // NOPMD keep instance variable instead of local variable
-  private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
-  private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable
-  private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable
-  private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
-  private final RecordGenerator<T> generatorFunction;
-  private final RecordSender<T> transport;
-  private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local
-  private final ScheduledExecutorService executor;
-
-  /**
-   * Create a new workload generator.
-   *
-   * @param instances the number of workload-generator instances.
-   * @param zooKeeper the zookeeper connection.
-   * @param keySpace the keyspace.
-   * @param threads the number of threads that is used to generate the load.
-   * @param period the period, how often a new record is emitted.
-   * @param duration the maximum runtime.
-   * @param beforeAction the action to perform before the workload generation starts.
-   * @param generatorFunction the function that is used to generate the individual records.
-   * @param transport the function that is used to send generated messages to the messaging system.
-   */
-  public AbstractWorkloadGenerator(
-      final int instances,
-      final ZooKeeper zooKeeper,
-      final KeySpace keySpace,
-      final int threads,
-      final Duration period,
-      final Duration duration,
-      final BeforeAction beforeAction,
-      final RecordGenerator<T> generatorFunction,
-      final RecordSender<T> transport) {
-    this.instances = instances;
-    this.zooKeeper = zooKeeper;
-    this.keySpace = keySpace;
-    this.beforeAction = beforeAction;
-    this.generatorFunction = generatorFunction;
-    this.workloadSelector = (workloadDefinition, workerId) -> {
-      final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>();
-
-      for (int i =
-          workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition
-              .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) {
-        final String id = workloadDefinition.getKeySpace().getPrefix() + i;
-        workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction));
-      }
-
-      return workloadEntities;
-    };
-    this.transport = transport;
-
-    this.executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
-
-    final int periodMs = (int) period.toMillis();
-
-    LOGGER.info("Period: {}", periodMs);
-
-    final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
-
-      final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
-
-      LOGGER.info("Beginning of Experiment...");
-      LOGGER.info("Generating records for {} keys.", entities.size());
-      LOGGER.info("Experiment is going to be executed for the specified duration...");
-
-      entities.forEach(entity -> {
-        final long initialDelay = random.nextInt(periodMs);
-        final Runnable task = () -> this.transport.send(entity.generateMessage());
-        this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS);
-      });
-
-
-      try {
-        this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS);
-        LOGGER.info("Terminating now...");
-        this.stop();
-      } catch (final InterruptedException e) {
-        LOGGER.error("", e);
-        throw new IllegalStateException("Error when terminating the workload generation.", e);
-      }
-    };
-
-    this.workloadDistributor = new WorkloadDistributor(
-        this.instances,
-        this.zooKeeper,
-        this.keySpace,
-        this.beforeAction,
-        workerAction);
-  }
-
-  /**
-   * Start the workload generation. The generation terminates automatically after the specified
-   * {@code duration}.
-   */
-  @Override
-  public void start() {
-    this.workloadDistributor.start();
-  }
-
-  @Override
-  public void stop() {
-    this.workloadDistributor.stop();
-  }
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
deleted file mode 100644
index 49bdf655b..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package theodolite.commons.workloadgeneration.generators;
-
-import java.time.Duration;
-import org.apache.avro.specific.SpecificRecord;
-import theodolite.commons.workloadgeneration.BeforeAction;
-import theodolite.commons.workloadgeneration.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.KeySpace;
-import theodolite.commons.workloadgeneration.RecordGenerator;
-import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-
-/**
- * Workload generator for generating load for the kafka messaging system.
- *
- * @param <T> The type of records the workload generator is dedicated for.
- */
-public class KafkaWorkloadGenerator<T extends SpecificRecord>
-    extends AbstractWorkloadGenerator<T> {
-
-  private final KafkaRecordSender<T> recordSender;
-
-  /**
-   * Create a new workload generator.
-   *
-   * @param zooKeeper a reference to the ZooKeeper instance.
-   * @param keySpace the key space to generate the workload for.
-   * @param threads tha amount of threads to use per instance.
-   * @param period the period how often a message is generated for each key specified in the
-   *        {@code keySpace}
-   * @param duration the duration how long the workload generator will emit messages.
-   * @param beforeAction the action which will be performed before the workload generator starts
-   *        generating messages. If {@code null}, no before action will be performed.
-   * @param generatorFunction the generator function. This function is executed, each time a message
-   *        is generated.
-   * @param recordSender the record sender which is used to send the generated messages to kafka.
-   */
-  public KafkaWorkloadGenerator(
-      final int instances,
-      final ZooKeeper zooKeeper,
-      final KeySpace keySpace,
-      final int threads,
-      final Duration period,
-      final Duration duration,
-      final BeforeAction beforeAction,
-      final RecordGenerator<T> generatorFunction,
-      final KafkaRecordSender<T> recordSender) {
-    super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction,
-        generatorFunction,
-        recordSender);
-    this.recordSender = recordSender;
-  }
-
-
-  @Override
-  public void stop() {
-    this.recordSender.terminate();
-
-    super.stop();
-  }
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
deleted file mode 100644
index 9252d5dcf..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package theodolite.commons.workloadgeneration.generators;
-
-import java.time.Duration;
-import java.util.Objects;
-import org.apache.avro.specific.SpecificRecord;
-import theodolite.commons.workloadgeneration.BeforeAction;
-import theodolite.commons.workloadgeneration.KafkaRecordSender;
-import theodolite.commons.workloadgeneration.KeySpace;
-import theodolite.commons.workloadgeneration.RecordGenerator;
-import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-
-/**
- * Builder for {@link workload generators}.
- *
- * @param <T> the record for which the builder is dedicated for.
- */
-public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD
-
-  private int instances; // NOPMD
-  private ZooKeeper zooKeeper; // NOPMD
-  private KeySpace keySpace; // NOPMD
-  private int threads; // NOPMD
-  private Duration period; // NOPMD
-  private Duration duration; // NOPMD
-  private BeforeAction beforeAction; // NOPMD
-  private RecordGenerator<T> generatorFunction; // NOPMD
-  private KafkaRecordSender<T> kafkaRecordSender; // NOPMD
-
-  private KafkaWorkloadGeneratorBuilder() {
-
-  }
-
-  /**
-   * Get a builder for the {@link KafkaWorkloadGenerator}.
-   *
-   * @return the builder.
-   */
-  public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
-    return new KafkaWorkloadGeneratorBuilder<>();
-  }
-
-  /**
-   * Set the number of instances.
-   *
-   * @param instances the number of instances.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) {
-    this.instances = instances;
-    return this;
-  }
-
-  /**
-   * Set the ZooKeeper reference.
-   *
-   * @param zooKeeper a reference to the ZooKeeper instance.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) {
-    this.zooKeeper = zooKeeper;
-    return this;
-  }
-
-  /**
-   * Set the before action for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param beforeAction the {@link BeforeAction}.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) {
-    this.beforeAction = beforeAction;
-    return this;
-  }
-
-  /**
-   * Set the key space for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param keySpace the {@link KeySpace}.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) {
-    this.keySpace = keySpace;
-    return this;
-  }
-
-  /**
-   * Set the key space for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param threads the number of threads.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) {
-    this.threads = threads;
-    return this;
-  }
-
-  /**
-   * Set the period for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param period the {@link Period}
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) {
-    this.period = period;
-    return this;
-  }
-
-  /**
-   * Set the durtion for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param duration the {@link Duration}.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) {
-    this.duration = duration;
-    return this;
-  }
-
-  /**
-   * Set the generator function for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param generatorFunction the generator function.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> generatorFunction(
-      final RecordGenerator<T> generatorFunction) {
-    this.generatorFunction = generatorFunction;
-    return this;
-  }
-
-  /**
-   * Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}.
-   *
-   * @param kafkaRecordSender the record sender to use.
-   * @return the builder.
-   */
-  public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender(
-      final KafkaRecordSender<T> kafkaRecordSender) {
-    this.kafkaRecordSender = kafkaRecordSender;
-    return this;
-  }
-
-  /**
-   * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be
-   * specicified before this method is called:
-   * <ul>
-   * <li>zookeeper</li>
-   * <li>key space</li>
-   * <li>period</li>
-   * <li>duration</li>
-   * <li>generator function</li>
-   * <li>kafka record sender</li>
-   * </ul>
-   *
-   * @return the built instance of the {@link KafkaWorkloadGenerator}.
-   */
-  public KafkaWorkloadGenerator<T> build() {
-    if (this.instances < 1) { // NOPMD
-      throw new IllegalArgumentException(
-          "Please specify a valid number of instances. Currently: " + this.instances);
-    }
-    Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance.");
-    if (this.threads < 1) { // NOPMD
-      this.threads = 1;
-    }
-    Objects.requireNonNull(this.keySpace, "Please specify the key space.");
-    Objects.requireNonNull(this.period, "Please specify the period.");
-    Objects.requireNonNull(this.duration, "Please specify the duration.");
-    this.beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> {
-    });
-    Objects.requireNonNull(this.generatorFunction, "Please specify the generator function.");
-    Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender.");
-
-    return new KafkaWorkloadGenerator<>(
-        this.instances,
-        this.zooKeeper,
-        this.keySpace,
-        this.threads,
-        this.period,
-        this.duration,
-        this.beforeAction,
-        this.generatorFunction,
-        this.kafkaRecordSender);
-  }
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java
deleted file mode 100644
index b121ac157..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package theodolite.commons.workloadgeneration.generators;
-
-/**
- * Base methods for workload generators.
- */
-public interface WorkloadGenerator {
-
-  /**
-   * Start the workload generation.
-   */
-  void start();
-
-  /**
-   * Stop the workload generation.
-   */
-  void stop();
-
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
deleted file mode 100644
index 61c6ac75e..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package theodolite.commons.workloadgeneration.misc;
-
-import theodolite.commons.workloadgeneration.KeySpace;
-
-/**
- * The central class that contains all information that needs to be exchanged between the nodes for
- * distributed workload generation.
- */
-public class WorkloadDefinition {
-  private static final int ZERO = 0;
-  private static final int ONE = 1;
-  private static final int TWO = 2;
-  private static final int THREE = 3;
-  private static final int FOUR = 4;
-
-  private final KeySpace keySpace;
-  private final int numberOfWorkers;
-
-  /**
-   * Create a new workload definition.
-   *
-   * @param keySpace the key space to use.
-   * @param numberOfWorkers the number of workers participating in the workload generation.
-   */
-  public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) {
-
-    this.keySpace = keySpace;
-    this.numberOfWorkers = numberOfWorkers;
-  }
-
-  public KeySpace getKeySpace() {
-    return this.keySpace;
-  }
-
-  public int getNumberOfWorkers() {
-    return this.numberOfWorkers;
-  }
-
-  /**
-   * Simple method for encoding all information of the workload definition into one string.
-   *
-   * @return a string that encodes all information of the workload generation in a compact format.
-   *         The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'.
-   */
-  @Override
-  public String toString() {
-    return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";"
-        + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers();
-  }
-
-  /**
-   * Parse a workload generation from a previously encoded string with the format returned by
-   * {@link WorkloadDefinition#toString()}.
-   *
-   * @param workloadDefinitionString the workload definition string.
-   * @return the parsed workload definition.
-   */
-  public static WorkloadDefinition fromString(final String workloadDefinitionString) {
-    final String[] deserialized = workloadDefinitionString.split(";");
-
-    if (deserialized.length != FOUR) {
-      throw new IllegalArgumentException(
-          "Wrong workload definition string when trying to parse the workload generation.");
-    }
-
-    return new WorkloadDefinition(
-        new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
-            Integer.valueOf(deserialized[TWO])),
-        Integer.valueOf(deserialized[THREE]));
-  }
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
deleted file mode 100644
index 55d240281..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package theodolite.commons.workloadgeneration.misc;
-
-import theodolite.commons.workloadgeneration.RecordGenerator;
-
-/**
- * Representation of a entity of the workload generation that generates load for one fixed key.
- *
- * @param <T> The type of records the workload generator is dedicated for.
- */
-public class WorkloadEntity<T> {
-  private final String key;
-  private final RecordGenerator<T> generator;
-
-  public WorkloadEntity(final String key, final RecordGenerator<T> generator) {
-    this.key = key;
-    this.generator = generator;
-  }
-
-  public T generateMessage() {
-    return this.generator.generate(this.key);
-  }
-}
diff --git a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java b/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java
deleted file mode 100644
index 56b1438a4..000000000
--- a/benchmarks/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package theodolite.commons.workloadgeneration.misc;
-
-/**
- * Wrapper for connection information for ZooKeeper.
- */
-@Deprecated
-public class ZooKeeper {
-
-  private final String host;
-  private final int port;
-
-  /**
-   * Create a new representation of an ZooKeeper instance.
-   *
-   * @param host of zookeeper.
-   * @param port of zookeeper.
-   */
-  public ZooKeeper(final String host, final int port) {
-    this.host = host;
-    this.port = port;
-  }
-
-  public String getHost() {
-    return this.host;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-}
-- 
GitLab