From 47e4354614e609b00be92e3c214da6cf423b9a9b Mon Sep 17 00:00:00 2001
From: Simon Ehrenstein <simon.ehrenstein@gmail.com>
Date: Sat, 23 May 2020 15:51:44 +0200
Subject: [PATCH] Refactor workload generators using wg common lib

---
 execution/run_uc1-new.sh                      |   8 ++
 execution/run_uc2-new.sh                      |   8 ++
 execution/run_uc3-new.sh                      |   8 ++
 execution/run_uc4-new.sh                      |   8 ++
 .../uc2-workload-generator/deployment.yaml    |   4 +
 .../uc3-workload-generator/deployment.yaml    |   4 +
 .../uc4-workload-generator/deployment.yaml    |   4 +
 .../spesb/kafkasender/KafkaRecordSender.java  |  84 -------------
 .../uc1/workloadgenerator/LoadGenerator.java  |  18 +--
 uc2-workload-generator/build.gradle           |   4 +
 .../uc2/workloadgenerator/LoadGenerator.java  | 118 ++++++++++--------
 uc3-workload-generator/build.gradle           |   4 +
 .../spesb/kafkasender/KafkaRecordSender.java  |  84 -------------
 .../uc3/workloadgenerator/LoadGenerator.java  |  70 ++++-------
 uc4-workload-generator/build.gradle           |   4 +
 .../spesb/kafkasender/KafkaRecordSender.java  |  84 -------------
 .../uc4/workloadgenerator/LoadGenerator.java  |  51 ++++----
 .../common/functions/MessageGenerator.java    |   3 +-
 .../main/java/common/functions/Transport.java |   5 +-
 .../common/generators/WorkloadGenerator.java  |   3 +-
 .../java/common/messages/OutputMessage.java   |  32 -----
 .../main/java/common/misc/WorkloadEntity.java |   3 +-
 .../kafka/KafkaRecordSender.java              |   6 +-
 .../zookeeper/WorkloadDistributor.java        |   6 +-
 24 files changed, 200 insertions(+), 423 deletions(-)
 delete mode 100644 uc1-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
 delete mode 100644 uc3-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
 delete mode 100644 uc4-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
 delete mode 100644 workload-generator-common/src/main/java/common/messages/OutputMessage.java

diff --git a/execution/run_uc1-new.sh b/execution/run_uc1-new.sh
index ed5897266..3ac30fe7d 100755
--- a/execution/run_uc1-new.sh
+++ b/execution/run_uc1-new.sh
@@ -79,6 +79,14 @@ do
 done
 echo "Finish topic deletion, print topics:"
 #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
+
+# delete zookeeper nodes used for workload generation
+echo "Delete ZooKeeper configurations used for workload generation"
+kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
+echo "Waiting for deletion"
+sleep 5s
+echo "Deletion finished"
+
 echo "Exiting script"
 
 KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
diff --git a/execution/run_uc2-new.sh b/execution/run_uc2-new.sh
index 503c4ffa0..bd33b305c 100755
--- a/execution/run_uc2-new.sh
+++ b/execution/run_uc2-new.sh
@@ -71,6 +71,14 @@ do
 done
 echo "Finish topic deletion, print topics:"
 #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
+
+# delete zookeeper nodes used for workload generation
+echo "Delete ZooKeeper configurations used for workload generation"
+kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
+echo "Waiting for deletion"
+sleep 5s
+echo "Deletion finished"
+
 echo "Exiting script"
 
 KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
diff --git a/execution/run_uc3-new.sh b/execution/run_uc3-new.sh
index b8c7c20a1..4d48eedb9 100755
--- a/execution/run_uc3-new.sh
+++ b/execution/run_uc3-new.sh
@@ -79,6 +79,14 @@ do
 done
 echo "Finish topic deletion, print topics:"
 #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
+
+# delete zookeeper nodes used for workload generation
+echo "Delete ZooKeeper configurations used for workload generation"
+kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
+echo "Waiting for deletion"
+sleep 5s
+echo "Deletion finished"
+
 echo "Exiting script"
 
 KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
diff --git a/execution/run_uc4-new.sh b/execution/run_uc4-new.sh
index ee3aaae98..b5156796d 100755
--- a/execution/run_uc4-new.sh
+++ b/execution/run_uc4-new.sh
@@ -74,6 +74,14 @@ do
 done
 echo "Finish topic deletion, print topics:"
 #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
+
+# delete zookeeper nodes used for workload generation
+echo "Delete ZooKeeper configurations used for workload generation"
+kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
+echo "Waiting for deletion"
+sleep 5s
+echo "Deletion finished"
+
 echo "Exiting script"
 
 KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
diff --git a/execution/uc2-workload-generator/deployment.yaml b/execution/uc2-workload-generator/deployment.yaml
index 52592626f..416802004 100644
--- a/execution/uc2-workload-generator/deployment.yaml
+++ b/execution/uc2-workload-generator/deployment.yaml
@@ -17,6 +17,10 @@ spec:
       - name: workload-generator
         image: benediktwetzel/uc2-wg:latest 
         env:
+        - name: ZK_HOST
+          value: "my-confluent-cp-zookeeper"
+        - name: ZK_PORT
+          value: "2181"
         - name: KAFKA_BOOTSTRAP_SERVERS
           value: "my-confluent-cp-kafka:9092"
         - name: HIERARCHY
diff --git a/execution/uc3-workload-generator/deployment.yaml b/execution/uc3-workload-generator/deployment.yaml
index 9ecd2b67e..2171c31a8 100644
--- a/execution/uc3-workload-generator/deployment.yaml
+++ b/execution/uc3-workload-generator/deployment.yaml
@@ -18,6 +18,10 @@ spec:
       - name: workload-generator
         image: soerenhenning/uc3-wg:latest 
         env:
+        - name: ZK_HOST
+          value: "my-confluent-cp-zookeeper"
+        - name: ZK_PORT
+          value: "2181"
         - name: KAFKA_BOOTSTRAP_SERVERS
           value: "my-confluent-cp-kafka:9092"
         - name: NUM_SENSORS
diff --git a/execution/uc4-workload-generator/deployment.yaml b/execution/uc4-workload-generator/deployment.yaml
index 6400abc34..516051b2a 100644
--- a/execution/uc4-workload-generator/deployment.yaml
+++ b/execution/uc4-workload-generator/deployment.yaml
@@ -17,6 +17,10 @@ spec:
       - name: workload-generator
         image: soerenhenning/uc4-wg:latest 
         env:
+        - name: ZK_HOST
+          value: "my-confluent-cp-zookeeper"
+        - name: ZK_PORT
+          value: "2181"
         - name: KAFKA_BOOTSTRAP_SERVERS
           value: "my-confluent-cp-kafka:9092"
         - name: NUM_SENSORS
diff --git a/uc1-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java b/uc1-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
deleted file mode 100644
index 034201411..000000000
--- a/uc1-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package spesb.kafkasender;
-
-import java.util.Properties;
-import java.util.function.Function;
-import kieker.common.record.IMonitoringRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-
-
-/**
- * Sends monitoring records to Kafka.
- *
- * @param <T> {@link IMonitoringRecord} to send
- */
-public class KafkaRecordSender<T extends IMonitoringRecord> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
-
-  private final String topic;
-
-  private final Function<T, String> keyAccessor;
-
-  private final Function<T, Long> timestampAccessor;
-
-  private final Producer<String, T> producer;
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, x -> "", x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor) {
-    this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
-    this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
-  }
-
-  /**
-   * Create a new {@link KafkaRecordSender}.
-   */
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
-      final Properties defaultProperties) {
-    this.topic = topic;
-    this.keyAccessor = keyAccessor;
-    this.timestampAccessor = timestampAccessor;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put("bootstrap.servers", bootstrapServers);
-    // properties.put("acks", this.acknowledges);
-    // properties.put("batch.size", this.batchSize);
-    // properties.put("linger.ms", this.lingerMs);
-    // properties.put("buffer.memory", this.bufferMemory);
-
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
-        IMonitoringRecordSerde.serializer());
-  }
-
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
index ef024e716..b0b22ceae 100644
--- a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
@@ -5,7 +5,6 @@ import common.dimensions.KeySpace;
 import common.dimensions.Period;
 import common.generators.KafkaWorkloadGenerator;
 import common.generators.KafkaWorkloadGeneratorBuilder;
-import common.messages.OutputMessage;
 import common.misc.ZooKeeper;
 import communication.kafka.KafkaRecordSender;
 import java.io.IOException;
@@ -22,8 +21,13 @@ public class LoadGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
 
   public static void main(final String[] args) throws InterruptedException, IOException {
+    // uc1
     LOGGER.info("Start workload generator for use case UC1.");
 
+    // get environment variables
+    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
+    final int zooKeeperPort =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
     final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int periodMs =
@@ -31,9 +35,6 @@ public class LoadGenerator {
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
     final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
         "4"));
-    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
-    final int zooKeeperPort =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
     final String kafkaInputTopic =
@@ -42,6 +43,7 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
+    // create kafka record sender
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
@@ -54,18 +56,20 @@ public class LoadGenerator {
         r -> r.getTimestamp(),
         kafkaProperties);
 
+    // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
         KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
             .setKeySpace(new KeySpace("s_", numSensors))
             .setThreads(threads)
             .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
-            .setDuration(new Duration(100, TimeUnit.SECONDS))
-            .setGeneratorFunction(sensor -> new OutputMessage<>(sensor,
-                new ActivePowerRecord(sensor, System.currentTimeMillis(), value)))
+            .setDuration(new Duration(30, TimeUnit.DAYS))
+            .setGeneratorFunction(
+                sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
             .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
             .setKafkaRecordSender(kafkaRecordSender)
             .build();
 
+    // start
     workloadGenerator.start();
   }
 }
diff --git a/uc2-workload-generator/build.gradle b/uc2-workload-generator/build.gradle
index d165ab24e..660079168 100644
--- a/uc2-workload-generator/build.gradle
+++ b/uc2-workload-generator/build.gradle
@@ -1 +1,5 @@
 mainClassName = "spesb.uc2.workloadgenerator.LoadGenerator"
+
+dependencies {
+    compile project(':workload-generator-common')
+}
\ No newline at end of file
diff --git a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java
index c2b05be3f..21e5216a3 100644
--- a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java
+++ b/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java
@@ -1,18 +1,19 @@
 package spesb.uc2.workloadgenerator;
 
+import common.dimensions.Duration;
+import common.dimensions.KeySpace;
+import common.dimensions.Period;
+import common.generators.KafkaWorkloadGenerator;
+import common.generators.KafkaWorkloadGeneratorBuilder;
+import common.misc.ZooKeeper;
+import communication.kafka.KafkaRecordSender;
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import spesb.kafkasender.KafkaRecordSender;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
 import titan.ccp.model.sensorregistry.MutableSensorRegistry;
@@ -23,12 +24,17 @@ public class LoadGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
 
   public static void main(final String[] args) throws InterruptedException, IOException {
+    // uc2
     LOGGER.info("Start workload generator for use case UC2.");
 
+    // get environment variables
     final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
     final int numNestedGroups = Integer
         .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
-    final int numSensor =
+    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
+    final int zooKeeperPort =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
+    final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
@@ -45,37 +51,11 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
-    final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
-    if (hierarchy.equals("deep")) {
-      MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
-      for (int lvl = 1; lvl < numNestedGroups; lvl++) {
-        lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
-      }
-      for (int s = 0; s < numSensor; s++) {
-        lastSensor.addChildMachineSensor("sensor_" + s);
-      }
-    } else if (hierarchy.equals("full")) {
-      addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
-    } else {
-      throw new IllegalStateException();
-    }
-
-    final List<String> sensors =
-        sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
-            .collect(Collectors.toList());
-
-    if (sendRegistry) {
-      final ConfigPublisher configPublisher =
-          new ConfigPublisher(kafkaBootstrapServers, "configuration");
-      configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
-      configPublisher.close();
-      System.out.println("Configuration sent.");
-
-      System.out.println("Now wait 30 seconds");
-      Thread.sleep(30_000);
-      System.out.println("And woke up again :)");
-    }
+    // build sensor registry
+    final MutableSensorRegistry sensorRegistry =
+        buildSensorRegistry(hierarchy, numNestedGroups, numSensors);
 
+    // create kafka record sender
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
@@ -85,20 +65,60 @@ public class LoadGenerator {
         new KafkaRecordSender<>(kafkaBootstrapServers,
             kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
 
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
+    // create workload generator
+    final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
+        KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setKeySpace(new KeySpace("s_", numSensors))
+            .setThreads(threads)
+            .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
+            .setDuration(new Duration(30, TimeUnit.DAYS))
+            .setBeforeAction(() -> {
+              if (sendRegistry) {
+                final ConfigPublisher configPublisher =
+                    new ConfigPublisher(kafkaBootstrapServers, "configuration");
+                configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
+                configPublisher.close();
+                System.out.println("Configuration sent.");
+
+                System.out.println("Now wait 30 seconds");
+                try {
+                  Thread.sleep(30_000);
+                } catch (final InterruptedException e) {
+                  // TODO Auto-generated catch block
+                  e.printStackTrace();
+                }
+                System.out.println("And woke up again :)");
+              }
+            })
+            .setGeneratorFunction(
+                sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
+            .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
+            .setKafkaRecordSender(kafkaRecordSender)
+            .build();
+
+    // start
+    workloadGenerator.start();
+  }
 
-    for (final String sensor : sensors) {
-      final int initialDelay = random.nextInt(periodMs);
-      executor.scheduleAtFixedRate(() -> {
-        kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
-      }, initialDelay, periodMs, TimeUnit.MILLISECONDS);
+  private static MutableSensorRegistry buildSensorRegistry(
+      final String hierarchy,
+      final int numNestedGroups,
+      final int numSensors) {
+    final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
+    if (hierarchy.equals("deep")) {
+      MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
+      for (int lvl = 1; lvl < numNestedGroups; lvl++) {
+        lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
+      }
+      for (int s = 0; s < numSensors; s++) {
+        lastSensor.addChildMachineSensor("sensor_" + s);
+      }
+    } else if (hierarchy.equals("full")) {
+      addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0);
+    } else {
+      throw new IllegalStateException();
     }
-
-    System.out.println("Wait for termination...");
-    executor.awaitTermination(30, TimeUnit.DAYS);
-    System.out.println("Will terminate now");
-
+    return sensorRegistry;
   }
 
   private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
diff --git a/uc3-workload-generator/build.gradle b/uc3-workload-generator/build.gradle
index e27cf26d2..a635010f4 100644
--- a/uc3-workload-generator/build.gradle
+++ b/uc3-workload-generator/build.gradle
@@ -1 +1,5 @@
 mainClassName = "spesb.uc3.workloadgenerator.LoadGenerator"
+
+dependencies {
+    compile project(':workload-generator-common')
+}
\ No newline at end of file
diff --git a/uc3-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java b/uc3-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
deleted file mode 100644
index 034201411..000000000
--- a/uc3-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package spesb.kafkasender;
-
-import java.util.Properties;
-import java.util.function.Function;
-import kieker.common.record.IMonitoringRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-
-
-/**
- * Sends monitoring records to Kafka.
- *
- * @param <T> {@link IMonitoringRecord} to send
- */
-public class KafkaRecordSender<T extends IMonitoringRecord> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
-
-  private final String topic;
-
-  private final Function<T, String> keyAccessor;
-
-  private final Function<T, Long> timestampAccessor;
-
-  private final Producer<String, T> producer;
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, x -> "", x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor) {
-    this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
-    this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
-  }
-
-  /**
-   * Create a new {@link KafkaRecordSender}.
-   */
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
-      final Properties defaultProperties) {
-    this.topic = topic;
-    this.keyAccessor = keyAccessor;
-    this.timestampAccessor = timestampAccessor;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put("bootstrap.servers", bootstrapServers);
-    // properties.put("acks", this.acknowledges);
-    // properties.put("batch.size", this.batchSize);
-    // properties.put("linger.ms", this.lingerMs);
-    // properties.put("buffer.memory", this.bufferMemory);
-
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
-        IMonitoringRecordSerde.serializer());
-  }
-
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java
index 9ab8a5530..1be92e61e 100644
--- a/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java
+++ b/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java
@@ -1,20 +1,19 @@
 package spesb.uc3.workloadgenerator;
 
+import common.dimensions.Duration;
+import common.dimensions.KeySpace;
+import common.dimensions.Period;
+import common.generators.KafkaWorkloadGenerator;
+import common.generators.KafkaWorkloadGeneratorBuilder;
+import common.misc.ZooKeeper;
+import communication.kafka.KafkaRecordSender;
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import spesb.kafkasender.KafkaRecordSender;
 import titan.ccp.models.records.ActivePowerRecord;
 
 public class LoadGenerator {
@@ -26,9 +25,12 @@ public class LoadGenerator {
   public static void main(final String[] args) throws InterruptedException, IOException {
     LOGGER.info("Start workload generator for use case UC3.");
 
+    // get environment variables
+    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
+    final int zooKeeperPort =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
     final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
-    final int instanceId = getInstanceId();
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
@@ -42,13 +44,7 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
-    final int idStart = instanceId * WL_MAX_RECORDS;
-    final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
-    LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
-    final List<String> sensors = IntStream.range(idStart, idEnd)
-        .mapToObj(i -> "s_" + i)
-        .collect(Collectors.toList());
-
+    // create kafka record sender
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
@@ -58,35 +54,21 @@ public class LoadGenerator {
         new KafkaRecordSender<>(kafkaBootstrapServers,
             kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
 
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
-
-    LOGGER.info("Start setting up sensors.");
-    for (final String sensor : sensors) {
-      final int initialDelay = random.nextInt(periodMs);
-      executor.scheduleAtFixedRate(() -> {
-        kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
-      }, initialDelay, periodMs, TimeUnit.MILLISECONDS);
-    }
-    LOGGER.info("Finished setting up sensors.");
+    // create workload generator
+    final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
+        KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setKeySpace(new KeySpace("s_", numSensors))
+            .setThreads(threads)
+            .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
+            .setDuration(new Duration(30, TimeUnit.DAYS))
+            .setGeneratorFunction(
+                sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
+            .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
+            .setKafkaRecordSender(kafkaRecordSender)
+            .build();
 
-    System.out.println("Wait for termination...");
-    executor.awaitTermination(30, TimeUnit.DAYS);
-    System.out.println("Will terminate now");
+    // start
+    workloadGenerator.start();
 
   }
-
-  private static int getInstanceId() {
-    final String podName = System.getenv("POD_NAME");
-    if (podName == null) {
-      return 0;
-    } else {
-      return Pattern.compile("-")
-          .splitAsStream(podName)
-          .reduce((p, x) -> x)
-          .map(Integer::parseInt)
-          .orElse(0);
-    }
-  }
-
 }
diff --git a/uc4-workload-generator/build.gradle b/uc4-workload-generator/build.gradle
index 8bbdedf4f..de0d5028d 100644
--- a/uc4-workload-generator/build.gradle
+++ b/uc4-workload-generator/build.gradle
@@ -1 +1,5 @@
 mainClassName = "spesb.uc4.workloadgenerator.LoadGenerator"
+
+dependencies {
+    compile project(':workload-generator-common')
+}
\ No newline at end of file
diff --git a/uc4-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java b/uc4-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
deleted file mode 100644
index 034201411..000000000
--- a/uc4-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package spesb.kafkasender;
-
-import java.util.Properties;
-import java.util.function.Function;
-import kieker.common.record.IMonitoringRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
-
-
-/**
- * Sends monitoring records to Kafka.
- *
- * @param <T> {@link IMonitoringRecord} to send
- */
-public class KafkaRecordSender<T extends IMonitoringRecord> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
-
-  private final String topic;
-
-  private final Function<T, String> keyAccessor;
-
-  private final Function<T, Long> timestampAccessor;
-
-  private final Producer<String, T> producer;
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic) {
-    this(bootstrapServers, topic, x -> "", x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor) {
-    this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
-  }
-
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
-    this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
-  }
-
-  /**
-   * Create a new {@link KafkaRecordSender}.
-   */
-  public KafkaRecordSender(final String bootstrapServers, final String topic,
-      final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
-      final Properties defaultProperties) {
-    this.topic = topic;
-    this.keyAccessor = keyAccessor;
-    this.timestampAccessor = timestampAccessor;
-
-    final Properties properties = new Properties();
-    properties.putAll(defaultProperties);
-    properties.put("bootstrap.servers", bootstrapServers);
-    // properties.put("acks", this.acknowledges);
-    // properties.put("batch.size", this.batchSize);
-    // properties.put("linger.ms", this.lingerMs);
-    // properties.put("buffer.memory", this.bufferMemory);
-
-    this.producer = new KafkaProducer<>(properties, new StringSerializer(),
-        IMonitoringRecordSerde.serializer());
-  }
-
-  /**
-   * Write the passed monitoring record to Kafka.
-   */
-  public void write(final T monitoringRecord) {
-    final ProducerRecord<String, T> record =
-        new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
-            this.keyAccessor.apply(monitoringRecord), monitoringRecord);
-
-    LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
-    this.producer.send(record);
-  }
-
-  public void terminate() {
-    this.producer.close();
-  }
-
-}
diff --git a/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java
index bcf4f6d2c..db2b1e8bd 100644
--- a/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java
+++ b/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java
@@ -1,19 +1,19 @@
 package spesb.uc4.workloadgenerator;
 
+import common.dimensions.Duration;
+import common.dimensions.KeySpace;
+import common.dimensions.Period;
+import common.generators.KafkaWorkloadGenerator;
+import common.generators.KafkaWorkloadGeneratorBuilder;
+import common.misc.ZooKeeper;
+import communication.kafka.KafkaRecordSender;
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import spesb.kafkasender.KafkaRecordSender;
 import titan.ccp.models.records.ActivePowerRecord;
 
 public class LoadGenerator {
@@ -24,7 +24,11 @@ public class LoadGenerator {
     // uc4
     LOGGER.info("Start workload generator for use case UC4.");
 
-    final int numSensor =
+    // get environment variables
+    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
+    final int zooKeeperPort =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
+    final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
@@ -39,6 +43,7 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
+    // create kafka record sender
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
@@ -48,23 +53,21 @@ public class LoadGenerator {
         new KafkaRecordSender<>(kafkaBootstrapServers,
             kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
 
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
-
-    final List<String> sensors =
-        IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList());
-
-    for (final String sensor : sensors) {
-      final int initialDelay = random.nextInt(periodMs);
-      executor.scheduleAtFixedRate(() -> {
-        kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
-      }, initialDelay, periodMs, TimeUnit.MILLISECONDS);
-    }
-
-    System.out.println("Wait for termination...");
-    executor.awaitTermination(30, TimeUnit.DAYS);
-    System.out.println("Will terminate now");
+    // create workload generator
+    final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
+        KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setKeySpace(new KeySpace("s_", numSensors))
+            .setThreads(threads)
+            .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
+            .setDuration(new Duration(30, TimeUnit.DAYS))
+            .setGeneratorFunction(
+                sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
+            .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
+            .setKafkaRecordSender(kafkaRecordSender)
+            .build();
 
+    // start
+    workloadGenerator.start();
   }
 
 }
diff --git a/workload-generator-common/src/main/java/common/functions/MessageGenerator.java b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java
index 525e8e58a..af874ecf0 100644
--- a/workload-generator-common/src/main/java/common/functions/MessageGenerator.java
+++ b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java
@@ -1,11 +1,10 @@
 package common.functions;
 
-import common.messages.OutputMessage;
 import kieker.common.record.IMonitoringRecord;
 
 @FunctionalInterface
 public interface MessageGenerator<T extends IMonitoringRecord> {
 
-  OutputMessage<T> generateMessage(final String key);
+  T generateMessage(final String key);
 
 }
diff --git a/workload-generator-common/src/main/java/common/functions/Transport.java b/workload-generator-common/src/main/java/common/functions/Transport.java
index e32b055b6..d4e5f1ef1 100644
--- a/workload-generator-common/src/main/java/common/functions/Transport.java
+++ b/workload-generator-common/src/main/java/common/functions/Transport.java
@@ -1,11 +1,10 @@
 package common.functions;
 
-import common.messages.OutputMessage;
 import kieker.common.record.IMonitoringRecord;
 
 @FunctionalInterface
 public interface Transport<T extends IMonitoringRecord> {
-  
-  public void transport(final OutputMessage<T> message);
+
+  void transport(final T message);
 
 }
diff --git a/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
index 859a19833..832e31d22 100644
--- a/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
+++ b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
@@ -15,7 +15,6 @@ import common.dimensions.Period;
 import common.functions.BeforeAction;
 import common.functions.MessageGenerator;
 import common.functions.Transport;
-import common.messages.OutputMessage;
 import common.misc.Worker;
 import common.misc.WorkloadDefinition;
 import common.misc.WorkloadEntity;
@@ -105,7 +104,7 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
       LOGGER.info("Beginning of Experiment...");
       LOGGER.info("Experiment is going to be executed for the specified duration...");
       entities.forEach(entity -> {
-        final OutputMessage<T> message = entity.generateMessage();
+        final T message = entity.generateMessage();
         final long initialDelay = random.nextInt(periodMs);
         this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
             periodMs, period.getTimeUnit());
diff --git a/workload-generator-common/src/main/java/common/messages/OutputMessage.java b/workload-generator-common/src/main/java/common/messages/OutputMessage.java
deleted file mode 100644
index cd61411c0..000000000
--- a/workload-generator-common/src/main/java/common/messages/OutputMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package common.messages;
-
-import kieker.common.record.IMonitoringRecord;
-
-/*
- * Wrapper class for messages within the messaging system.
- */
-public class OutputMessage<T extends IMonitoringRecord> {
-  private final String key;
-  private final T value;
-
-  /***
-   * Create a new Message.
-   * 
-   * @param key the key of the message.
-   * @param value the value of the message.
-   */
-  public OutputMessage(final String key, final T value) {
-    super();
-    this.key = key;
-    this.value = value;
-  }
-
-  public String getKey() {
-    return this.key;
-  }
-
-  public T getValue() {
-    return this.value;
-  }
-
-}
diff --git a/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java b/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java
index ef092c698..c0133022c 100644
--- a/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java
+++ b/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java
@@ -1,7 +1,6 @@
 package common.misc;
 
 import common.functions.MessageGenerator;
-import common.messages.OutputMessage;
 import kieker.common.record.IMonitoringRecord;
 
 public class WorkloadEntity<T extends IMonitoringRecord> {
@@ -13,7 +12,7 @@ public class WorkloadEntity<T extends IMonitoringRecord> {
     this.generator = generator;
   }
 
-  public OutputMessage<T> generateMessage() {
+  public T generateMessage() {
     return this.generator.generateMessage(this.key);
   }
 }
diff --git a/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java
index 8a0fb0266..02312f9d2 100644
--- a/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java
+++ b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java
@@ -9,7 +9,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import common.functions.Transport;
-import common.messages.OutputMessage;
 import kieker.common.record.IMonitoringRecord;
 import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
 
@@ -84,9 +83,8 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport
   }
 
   @Override
-  public void transport(final OutputMessage<T> message) {
-    System.out.println(message.getKey());
-    this.write(message.getValue());
+  public void transport(final T message) {
+    this.write(message);
   }
 
 }
diff --git a/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java b/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java
index f677b8f71..8701877f7 100644
--- a/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java
+++ b/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java
@@ -143,8 +143,10 @@ public class WorkloadDistributor {
               WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
 
           if (worker.getId() > declaration.getNumberOfWorkers() - 1) {
-            throw new IllegalStateException("Worker with id " + worker.getId()
-                + " was too slow and is therefore not participating in the workload generation.");
+            LOGGER.warn("Worker with id {} was to slow and is therefore in idle state",
+                worker.getId());
+            WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0),
+                worker); // this worker generates no workload
           } else {
             WorkloadDistributor.this.workerAction.accept(declaration, worker);
           }
-- 
GitLab