From 086892faa897ca018d1263e88243d92f37b7c0d4 Mon Sep 17 00:00:00 2001
From: Simon Ehrenstein <simon.ehrenstein@gmail.com>
Date: Sun, 28 Jun 2020 22:12:07 +0200
Subject: [PATCH] Cleanup and restructuring code

---
 execution/infrastructure/grafana/values.yaml  |  2 +-
 .../aggregation-deployment.yaml               |  2 +-
 .../uc2-workload-generator/deployment.yaml    |  2 +-
 .../uc1/workloadgenerator/LoadGenerator.java  |  3 +
 .../kafkasender/KafkaRecordSender.java        | 84 -------------------
 .../uc2/workloadgenerator/LoadGenerator.java  |  3 +
 .../uc3/workloadgenerator/LoadGenerator.java  |  3 +
 .../uc4/workloadgenerator/LoadGenerator.java  |  3 +
 .../kafka/KafkaRecordSender.java              |  2 +-
 .../zookeeper/WorkloadDistributor.java        | 33 ++++++--
 .../generators/AbstractWorkloadGenerator.java | 52 ++++++++----
 .../generators/KafkaWorkloadGenerator.java    |  2 +
 .../KafkaWorkloadGeneratorBuilder.java        |  9 +-
 .../misc/WorkloadDefinition.java              | 16 +++-
 .../misc/WorkloadEntity.java                  |  4 +-
 .../workloadgeneration/misc/ZooKeeper.java    |  2 +-
 16 files changed, 103 insertions(+), 119 deletions(-)
 delete mode 100644 uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java

diff --git a/execution/infrastructure/grafana/values.yaml b/execution/infrastructure/grafana/values.yaml
index 16f075745..29d64a2d7 100644
--- a/execution/infrastructure/grafana/values.yaml
+++ b/execution/infrastructure/grafana/values.yaml
@@ -47,7 +47,7 @@ sidecar:
     # If specified, the sidecar will search for datasource config-maps inside this namespace.
     # Otherwise the namespace in which the sidecar is running will be used.
     # It's also possible to specify ALL to search in all namespaces
-    searchNamespace: default
+    searchNamespace: null
 
 
 service:
diff --git a/execution/uc2-application/aggregation-deployment.yaml b/execution/uc2-application/aggregation-deployment.yaml
index ce5242173..4dfad0338 100644
--- a/execution/uc2-application/aggregation-deployment.yaml
+++ b/execution/uc2-application/aggregation-deployment.yaml
@@ -15,7 +15,7 @@ spec:
       terminationGracePeriodSeconds: 0
       containers:
       - name: uc2-application
-        image: "benediktwetzel/uc2-app:latest"
+        image: "soerenhenning/uc2-app:latest"
         ports:
         - containerPort: 5555
           name: jmx
diff --git a/execution/uc2-workload-generator/deployment.yaml b/execution/uc2-workload-generator/deployment.yaml
index 416802004..bcf7efafa 100644
--- a/execution/uc2-workload-generator/deployment.yaml
+++ b/execution/uc2-workload-generator/deployment.yaml
@@ -15,7 +15,7 @@ spec:
       terminationGracePeriodSeconds: 0
       containers:
       - name: workload-generator
-        image: benediktwetzel/uc2-wg:latest 
+        image: soerenhenning/uc2-wg:latest 
         env:
         - name: ZK_HOST
           value: "my-confluent-cp-zookeeper"
diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
index e9de4c42e..381aef34d 100644
--- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
@@ -26,6 +26,9 @@ public final class LoadGenerator {
 
   private LoadGenerator() {}
 
+  /**
+   * Entry point.
+   */
   public static void main(final String[] args) throws InterruptedException, IOException {
     // uc1
     LOGGER.info("Start workload generator for use case UC1.");
diff --git a/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java
deleted file mode 100644
index bf562d86a..000000000
--- a/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package theodolite.kafkasender;
-
-import java.util.Properties;
-import java.util.function.Function;
-import kieker.common.record.IMonitoringRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-
-
-/**
- * Sends monitoring records to Kafka.
- *
- * @param <T> {@link IMonitoringRecord} to send
- */
-public class KafkaRecordSender<T extends IMonitoringRecord> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
-
-  private final String topic;
-
-  private final Function<T, String> keyAccessor;
-
-  private final Function<T, Long> timestampAccessor;
-
-  private final Producer<String, T> producer;
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, x -> "", x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor) {
-    this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
-    this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
-  }
-
-  /**
-   * Create a new {@link KafkaRecordSender}.
-   */
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
-      final Properties defaultProperties) {
-    this.topic = topic;
-    this.keyAccessor = keyAccessor;
-    this.timestampAccessor = timestampAccessor;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put("bootstrap.servers", bootstrapServers);
-    // properties.put("acks", this.acknowledges);
-    // properties.put("batch.size", this.batchSize);
-    // properties.put("linger.ms", this.lingerMs);
-    // properties.put("buffer.memory", this.bufferMemory);
-
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
-        IMonitoringRecordSerde.serializer());
-  }
-
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
index cdcc01f21..6cf7d81af 100644
--- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
+++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
@@ -51,6 +51,8 @@ public class LoadGenerator {
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
+    final int instances =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
 
     // build sensor registry
     final MutableSensorRegistry sensorRegistry =
@@ -69,6 +71,7 @@ public class LoadGenerator {
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
         KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setInstances(instances)
             .setKeySpace(new KeySpace("s_", numSensors))
             .setThreads(threads)
             .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
index 3a64e55db..80e3810f9 100644
--- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
+++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
@@ -43,6 +43,8 @@ public class LoadGenerator {
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
+    final int instances =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
 
     // create kafka record sender
     final Properties kafkaProperties = new Properties();
@@ -57,6 +59,7 @@ public class LoadGenerator {
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
         KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setInstances(instances)
             .setKeySpace(new KeySpace("s_", numSensors))
             .setThreads(threads)
             .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
index 3ad36e21a..84df87a6a 100644
--- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
+++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
@@ -43,6 +43,8 @@ public class LoadGenerator {
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
+    final int instances =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
 
     // create kafka record sender
     final Properties kafkaProperties = new Properties();
@@ -57,6 +59,7 @@ public class LoadGenerator {
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
         KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setInstances(instances)
             .setKeySpace(new KeySpace("s_", numSensors))
             .setThreads(threads)
             .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
index d78171b7a..c420658b7 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java
@@ -2,13 +2,13 @@ package theodolite.commons.workloadgeneration.communication.kafka;
 
 import java.util.Properties;
 import java.util.function.Function;
+import kieker.common.record.IMonitoringRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import kieker.common.record.IMonitoringRecord;
 import theodolite.commons.workloadgeneration.functions.Transport;
 import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
 
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
index 454073850..6ad61ae9c 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java
@@ -19,7 +19,7 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
 import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
-/*
+/**
  * The central class responsible for distributing the workload through all workload generators.
  */
 public class WorkloadDistributor {
@@ -30,6 +30,14 @@ public class WorkloadDistributor {
   private static final String COUNTER_PATH = "/counter";
   private static final String WORKLOAD_PATH = "/workload";
   private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
+
+  // Curator retry strategy
+  private static final int BASE_SLEEP_TIME_MS = 2000;
+  private static final int MAX_RETRIES = 5;
+
+  // Wait time
+  private static final int MAX_WAIT_TIME = 20_000;
+
   private final DistributedAtomicInteger counter;
   private final KeySpace keySpace;
   private final BeforeAction beforeAction;
@@ -39,7 +47,7 @@ public class WorkloadDistributor {
   private final ZooKeeper zooKeeper;
   private final CuratorFramework client;
 
-  private boolean workloadGenerationStarted = true;
+  private boolean workloadGenerationStarted = false;
 
   /**
    * Create a new workload distributor.
@@ -63,7 +71,7 @@ public class WorkloadDistributor {
     this.client = CuratorFrameworkFactory.builder()
         .namespace(NAMESPACE)
         .connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
-        .retryPolicy(new ExponentialBackoffRetry(2000, 5))
+        .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES))
         .build();
 
     this.client.start();
@@ -77,7 +85,7 @@ public class WorkloadDistributor {
 
     this.counter =
         new DistributedAtomicInteger(this.client, COUNTER_PATH,
-            new ExponentialBackoffRetry(2000, 5));
+            new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
   }
 
   /**
@@ -94,7 +102,11 @@ public class WorkloadDistributor {
 
       final CuratorWatcher watcher = this.buildWatcher(workerId);
 
-      this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
+      final Stat nodeExists =
+          this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH);
+      if (nodeExists == null) {
+        this.client.create().forPath(WORKLOAD_PATH);
+      }
 
       if (workerId == 0) {
         LOGGER.info("This instance is master with id {}", workerId);
@@ -117,16 +129,19 @@ public class WorkloadDistributor {
 
         this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
 
-        final Stat exists =
+        final Stat definitionExists =
             this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
 
-        if (exists != null) {
+        if (definitionExists != null) {
           this.startWorkloadGeneration(workerId);
         }
       }
 
-      Thread.sleep(20_000);
+      Thread.sleep(MAX_WAIT_TIME);
 
+      if (!this.workloadGenerationStarted) {
+        LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
+      }
     } catch (final Exception e) {
       LOGGER.error("", e);
       throw new IllegalStateException("Error when starting the distribution of the workload.");
@@ -137,7 +152,7 @@ public class WorkloadDistributor {
    * Start the workload generation. This methods body does only get executed once.
    *
    * @param workerId the ID of this worker
-   * @throws Exception
+   * @throws Exception when an error occurs
    */
   private synchronized void startWorkloadGeneration(final int workerId) throws Exception {
     if (!this.workloadGenerationStarted) {
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
index fc58b778a..889075bf8 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java
@@ -21,6 +21,11 @@ import theodolite.commons.workloadgeneration.misc.WorkloadDefinition;
 import theodolite.commons.workloadgeneration.misc.WorkloadEntity;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
+/**
+ * Base for workload generators.
+ *
+ * @param <T> The type of records the workload generator is dedicated for.
+ */
 public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
     implements WorkloadGenerator {
 
@@ -51,19 +56,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
   private final ScheduledExecutorService executor;
 
   /**
-   * Start the workload generation. The generation terminates automatically after the specified
-   * {@code duration}.
+   * Create a new workload generator.
+   *
+   * @param instances the number of workload-generator instances.
+   * @param zooKeeper the zookeeper connection.
+   * @param keySpace the keyspace.
+   * @param threads the number of threads that is used to generate the load.
+   * @param period the period, how often a new record is emitted.
+   * @param duration the maximum runtime.
+   * @param beforeAction the action to perform before the workload generation starts.
+   * @param generatorFunction the function that is used to generate the individual records.
+   * @param transport the function that is used to send generated messages to the messaging system.
    */
-  @Override
-  public void start() {
-    this.workloadDistributor.start();
-  }
-
-  @Override
-  public void stop() {
-    this.workloadDistributor.stop();
-  }
-
   public AbstractWorkloadGenerator(
       final int instances,
       final ZooKeeper zooKeeper,
@@ -99,19 +103,25 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
     this.executor = Executors.newScheduledThreadPool(threads);
     final Random random = new Random();
 
-    final int periodMs = period.getNano() / 1_000_000;
+    final int periodMs = (int) period.toMillis();
+
+    LOGGER.info("Period: " + periodMs);
 
     final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
 
       final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId);
 
       LOGGER.info("Beginning of Experiment...");
+      LOGGER.info("Generating records for {} keys.", entities.size());
       LOGGER.info("Experiment is going to be executed for the specified duration...");
 
       entities.forEach(entity -> {
         final T message = entity.generateMessage();
         final long initialDelay = random.nextInt(periodMs);
-        this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
+        final Runnable task = () -> {
+          this.transport.transport(message);
+        };
+        this.executor.scheduleAtFixedRate(task, initialDelay,
             periodMs, TimeUnit.MILLISECONDS);
       });
 
@@ -130,4 +140,18 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
         new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction,
             workerAction);
   }
+
+  /**
+   * Start the workload generation. The generation terminates automatically after the specified
+   * {@code duration}.
+   */
+  @Override
+  public void start() {
+    this.workloadDistributor.start();
+  }
+
+  @Override
+  public void stop() {
+    this.workloadDistributor.stop();
+  }
 }
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
index 90fecf01c..9cce7e563 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java
@@ -10,6 +10,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
 /**
  * Workload generator for generating load for the kafka messaging system.
+ *
+ * @param <T> The type of records the workload generator is dedicated for.
  */
 public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
     extends AbstractWorkloadGenerator<T> {
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
index afb232f71..4ece341bc 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java
@@ -9,6 +9,11 @@ import theodolite.commons.workloadgeneration.functions.BeforeAction;
 import theodolite.commons.workloadgeneration.functions.MessageGenerator;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 
+/**
+ * Builder for {@link workload generators}.
+ *
+ * @param <T> the record for which the builder is dedicated for.
+ */
 public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
 
   private int instances;
@@ -45,7 +50,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
   /**
    * Set the number of instances.
    *
-   * @param zooKeeper a reference to the ZooKeeper instance.
+   * @param instances the number of instances.
    * @return the builder.
    */
   public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) {
@@ -89,7 +94,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
   /**
    * Set the key space for the {@link KafkaWorkloadGenerator}.
    *
-   * @param keySpace the {@link KeySpace}.
+   * @param threads the number of threads.
    * @return the builder.
    */
   public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) {
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
index 79e3cb605..86369d6c8 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java
@@ -2,11 +2,17 @@ package theodolite.commons.workloadgeneration.misc;
 
 import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 
-/*
+/**
  * The central class that contains all information that needs to be exchanged between the nodes for
  * distributed workload generation.
  */
 public class WorkloadDefinition {
+  private static final int ZERO = 0;
+  private static final int ONE = 1;
+  private static final int TWO = 2;
+  private static final int THREE = 3;
+  private static final int FOUR = 4;
+
   private final KeySpace keySpace;
   private final int numberOfWorkers;
 
@@ -52,12 +58,14 @@ public class WorkloadDefinition {
   public static WorkloadDefinition fromString(final String workloadDefinitionString) {
     final String[] deserialized = workloadDefinitionString.split(";");
 
-    if (deserialized.length != 4) {
+    if (deserialized.length != FOUR) {
       throw new IllegalArgumentException(
           "Wrong workload definition string when trying to parse the workload generation.");
     }
 
-    return new WorkloadDefinition(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]),
-        Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3]));
+    return new WorkloadDefinition(
+        new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]),
+            Integer.valueOf(deserialized[TWO])),
+        Integer.valueOf(deserialized[THREE]));
   }
 }
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
index 66c347eaf..1e3c55257 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java
@@ -3,8 +3,10 @@ package theodolite.commons.workloadgeneration.misc;
 import kieker.common.record.IMonitoringRecord;
 import theodolite.commons.workloadgeneration.functions.MessageGenerator;
 
-/*
+/**
  * Representation of a entity of the workload generation that generates load for one fixed key.
+ *
+ * @param <T> The type of records the workload generator is dedicated for.
  */
 public class WorkloadEntity<T extends IMonitoringRecord> {
   private final String key;
diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java
index 9f8ac9228..a80490600 100644
--- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java
+++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java
@@ -1,6 +1,6 @@
 package theodolite.commons.workloadgeneration.misc;
 
-/*
+/**
  * Wrapper for connection information for ZooKeeper.
  */
 public class ZooKeeper {
-- 
GitLab