-<<<<<<< HEAD
-**TODO** Add required configuration, installation
-### The Kafka Lag Exporter
-Lightbend's Kafka Lag Exporter can be installed via helm:
-helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.0/kafka-lag-exporter-0.6.0.tgz
-**TODO** Add configuration + ServiceMonitor
 #### Our patched Confluent Helm Charts
 To use our patched Confluent Helm Charts clone the
@@ -118,17 +105,10 @@ To let Prometheus scrape Kafka lag metrics, deploy a ServiceMonitor:
 kubectl apply -f infrastructure/kafka-lag-exporter/service-monitor.yaml
->>>>>>> 624692753eb09684dd3dda3926482e9b56ada0d6
 ## Python 3.7
-<<<<<<< HEAD
-For executing benchmarks and analyzing their results, a Python 3.7 installation
-is required. We suggest to use a virtual environment placed in the `.venv` directory.
-**TODO** Show how to install requirements
 For executing benchmarks and analyzing their results, a **Python 3.7** installation
 is required. We suggest to use a virtual environment placed in the `.venv` directory.
@@ -169,4 +149,3 @@ The `./run_loop.sh` is the entrypoint for all benchmark executions. Is has to be
 * `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`.
 * `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`.
 * `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`.
->>>>>>> 624692753eb09684dd3dda3926482e9b56ada0d6
diff --git a/execution/cluster-setup.sh b/execution/cluster-setup.sh
new file mode 100755
index 0000000000000000000000000000000000000000..a0e271deda23106f3c734f546a6059d09995d08c
--- /dev/null
+++ b/execution/cluster-setup.sh
@@ -0,0 +1,60 @@
+## minikube ##
+# if minikube stop responding after few minutes
+# minikube delete 
+# cd 
+# rm -r .minikube
+# cd Dokumente/Master-2-SoSe-2020/project/spesb/execution/
+#minikube config set memory  4046
+#minikube delete
+#minikube config set cpus 4 
+#minikube delete
+#minikube start --vm-driver=virtualbox
+## kind ## 
+kind delete cluster 
+kind create cluster # --config  infrastructure/cluster/kind-configuration.yaml 
+# K8s dashboard
+# Token: eyJhbGciOiJSUzI1NiIsImtpZCI6ImdGNlU1U3BnN01XcS14RnlWUFRBODlaTzNpeUtxa1hTV3VKNTVmVGVrZ2MifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlcm5ldGVzLWRhc2hib2FyZCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLW5rbnc1Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiI2NDE5NTUzYy0yY2RkLTQ2OGUtYTMwNS03YzZlNWQ5NjhmMzMiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZXJuZXRlcy1kYXNoYm9hcmQ6ZGVmYXVsdCJ9.xkkZJViw4Q7RUbpzWjmTUGIGlgHfIhC94GuJ_bmGId3w8UxCP5PK6eq0yPNTTMJMT-yGr3qH7D1f616UNDJM1SrcoervG1fXyzw0XYmdbXluemW1LIm3WyzukhBs4dF4s93RrxUd9iHFjCrQanssXOSDDCZO-2V4BrpYEZ4TLvgMz9pAy4_k4-1gL4QKu8FBgCydBa2SBVOZ8tFy_5r38KH7j9eX0OJD8kyugcmPz0ARaIZhyZERyTHz3wmxY5E-W_qSe1GY12EeR9c5KlWeGYYIzheyBr-TpcyLuoQQguxmI7Ico917k0zG2YBSEYdfTo1I2LPXCYzbp__MAIV_TQ
+# kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.0/aio/deploy/recommended.yaml
+# kubectl proxy & 
+# prometheus
+helm install prometheus-operator stable/prometheus-operator -f infrastructure/prometheus/helm-values.yaml
+kubectl apply -f infrastructure/prometheus/service-account.yaml
+kubectl apply -f infrastructure/prometheus/cluster-role.yaml
+kubectl apply -f infrastructure/prometheus/cluster-role-binding.yaml
+kubectl apply -f infrastructure/prometheus/prometheus.yaml
+# grafana 
+kubectl apply -f infrastructure/grafana/prometheus-datasource-config-map.yaml
+kubectl apply -f infrastructure/grafana/dashboard-config-map.yaml
+helm install grafana stable/grafana -f infrastructure/grafana/values.yaml
+# kafka + lag-exporter
+helm install my-confluent $CP_HELM_PATH  -f infrastructure/kafka/values.yaml
+kubectl apply -f $CP_HELM_PATH/examples/kafka-client.yaml 
+kubectl apply -f infrastructure/kafka/service-monitor.yaml
+helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.0/kafka-lag-exporter-0.6.0.tgz -f infrastructure/kafka-lag-exporter/values.yaml
+kubectl apply -f infrastructure/kafka-lag-exporter/service-monitor.yaml
+# port fowarding kubectl "port-forward --namespace monitoring <pod name> <local port>:<container port>"
+sleep 3m # wait for grafana and prometheus pods
+kubectl port-forward $(kubectl get pods  -o name | grep grafana) 3000:3000 & 
+kubectl port-forward $(kubectl get pods  -o name | grep prometheus-prometheus) 9090:9090 &
+# open web interfaces
+# xdg-open http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/. 
+xdg-open http://localhost:9090
+xdg-open http://localhost:3000
+# grafana token
+# kubectl get secret --namespace default grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo
diff --git a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
index 9eb95f0c104ee3a5cd497f735f839cdb474af6a9..ef024e71677910517d6d23ad7286b55e11e2164d 100644
--- a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java
@@ -1,38 +1,39 @@
 package spesb.uc1.workloadgenerator;
+import common.dimensions.Duration;
+import common.dimensions.KeySpace;
+import common.dimensions.Period;
+import common.generators.KafkaWorkloadGenerator;
+import common.generators.KafkaWorkloadGeneratorBuilder;
+import common.messages.OutputMessage;
+import common.misc.ZooKeeper;
+import communication.kafka.KafkaRecordSender;
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import spesb.kafkasender.KafkaRecordSender;
 import titan.ccp.models.records.ActivePowerRecord;
 public class LoadGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
-  private static final int WL_MAX_RECORDS = 150_000;
   public static void main(final String[] args) throws InterruptedException, IOException {
     LOGGER.info("Start workload generator for use case UC1.");
     final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
-    final int instanceId = getInstanceId();
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
-    final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
+    final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
+        "4"));
+    final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
+    final int zooKeeperPort =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
     final String kafkaInputTopic =
@@ -41,13 +42,6 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
-    final int idStart = instanceId * WL_MAX_RECORDS;
-    final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
-    LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
-    final List<String> sensors = IntStream.range(idStart, idEnd)
-        .mapToObj(i -> "s_" + i)
-        .collect(Collectors.toList());
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
@@ -60,33 +54,18 @@ public class LoadGenerator {
         r -> r.getTimestamp(),
-    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
-    final Random random = new Random();
-    for (final String sensor : sensors) {
-      final int initialDelay = random.nextInt(periodMs);
-      executor.scheduleAtFixedRate(() -> {
-        kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
-      }, initialDelay, periodMs, TimeUnit.MILLISECONDS);
-    }
-    System.out.println("Wait for termination...");
-    executor.awaitTermination(30, TimeUnit.DAYS);
-    System.out.println("Will terminate now");
+    final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
+        KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
+            .setKeySpace(new KeySpace("s_", numSensors))
+            .setThreads(threads)
+            .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
+            .setDuration(new Duration(100, TimeUnit.SECONDS))
+            .setGeneratorFunction(sensor -> new OutputMessage<>(sensor,
+                new ActivePowerRecord(sensor, System.currentTimeMillis(), value)))
+            .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
+            .setKafkaRecordSender(kafkaRecordSender)
+            .build();
+    workloadGenerator.start();
-  private static int getInstanceId() {
-    final String podName = System.getenv("POD_NAME");
-    if (podName == null) {
-      return 0;
-    } else {
-      return Pattern.compile("-")
-          .splitAsStream(podName)
-          .reduce((p, x) -> x)
-          .map(Integer::parseInt)
-          .orElse(0);
-    }
-  }
diff --git a/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
index 02d0953a38b610887ceaa6bd4fa698df718bc597..9c496fba2188758d37adf098e022f479e5129aec 100644
--- a/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
+++ b/uc3-application/src/main/java/uc3/streamprocessing/KafkaStreamsBuilder.java
@@ -5,6 +5,7 @@ import java.util.Objects;
 import java.util.Properties;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
+import spesb.uc3.streamprocessing.TopologyBuilder;
 import titan.ccp.common.kafka.streams.PropertiesBuilder;
index 42f26db80de6ea8e3a0ccefa8b6b3252d3fea16a..66553cc1982397d329775b20a42db4336395e9f5 100644
--- a/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java
+++ b/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java
@@ -5,6 +5,7 @@ import common.dimensions.KeySpace;
 import common.dimensions.Period;
 import common.functions.BeforeAction;
 import common.functions.MessageGenerator;
+import common.misc.ZooKeeper;
 import communication.kafka.KafkaRecordSender;
 import kieker.common.record.IMonitoringRecord;
@@ -19,6 +20,7 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends Workloa
    * Create a new workload generator.
    * @param keySpace the key space to generate the workload for.
+   * @param threads tha amount of threads to use per instance.
    * @param period the period how often a message is generated for each key specified in the
    *        {@code keySpace}
    * @param duration the duration how long the workload generator will emit messages.
@@ -29,22 +31,23 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends Workloa
    * @param recordSender the record sender which is used to send the generated messages to kafka.
   public KafkaWorkloadGenerator(
+      final ZooKeeper zooKeeper,
       final KeySpace keySpace,
+      final int threads,
       final Period period,
       final Duration duration,
       final BeforeAction beforeAction,
       final MessageGenerator<T> generatorFunction,
       final KafkaRecordSender<T> recordSender) {
-    super(keySpace, period, duration, beforeAction, generatorFunction, o -> {
-      System.out.println(o.getKey());
-    });
+    super(zooKeeper, keySpace, threads, period, duration, beforeAction, generatorFunction,
+        recordSender);
     this.recordSender = recordSender;
   public void stop() {
-    // this.recordSender.terminate();
+    this.recordSender.terminate();
diff --git a/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
index b012c812f84bf934bbf274cbab228e35b6e114cf..859a19833af3a289ccf83221dd1c288e97fcd99e 100644
--- a/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
+++ b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java
@@ -7,6 +7,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import common.dimensions.Duration;
 import common.dimensions.KeySpace;
 import common.dimensions.Period;
@@ -17,13 +19,20 @@ import common.messages.OutputMessage;
 import common.misc.Worker;
 import common.misc.WorkloadDefinition;
 import common.misc.WorkloadEntity;
+import common.misc.ZooKeeper;
 import communication.zookeeper.WorkloadDistributor;
 import kieker.common.record.IMonitoringRecord;
 public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements IWorkloadGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadGenerator.class);
+  private final ZooKeeper zooKeeper;
   private final KeySpace keySpace;
+  private final int threads;
   private final Period period;
   private final Duration duration;
@@ -42,7 +51,7 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
    * Start the workload generation. The generation terminates automatically after the specified
-   * {@code duration}.s
+   * {@code duration}.
   public void start() {
@@ -55,13 +64,17 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
   public WorkloadGenerator(
+      final ZooKeeper zooKeeper,
       final KeySpace keySpace,
+      final int threads,
       final Period period,
       final Duration duration,
       final BeforeAction beforeAction,
       final MessageGenerator<T> generatorFunction,
       final Transport<T> transport) {
+    this.zooKeeper = zooKeeper;
     this.period = period;
+    this.threads = threads;
     this.keySpace = keySpace;
     this.duration = duration;
     this.beforeAction = beforeAction;
@@ -80,29 +93,27 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
     this.transport = transport;
-    final int threads = 10; // env
     this.executor = Executors.newScheduledThreadPool(threads);
     final Random random = new Random();
-    final int periodMs = period.getDuration();
+    final int periodMs = period.getPeriod();
     final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> {
       final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker);
-      System.out.println("Beginning of Experiment...");
-      System.out.println("Experiment is going to be executed for the specified duration...");
+      LOGGER.info("Beginning of Experiment...");
+      LOGGER.info("Experiment is going to be executed for the specified duration...");
       entities.forEach(entity -> {
         final OutputMessage<T> message = entity.generateMessage();
         final long initialDelay = random.nextInt(periodMs);
         this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
             periodMs, period.getTimeUnit());
       try {
         this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit());
-        System.out.println("Terminating now...");
+        LOGGER.info("Terminating now...");
       } catch (final InterruptedException e) {
         // TODO Auto-generated catch block
@@ -111,6 +122,6 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
     this.workloadDistributor =
-        new WorkloadDistributor(this.keySpace, this.beforeAction, workerAction);
+        new WorkloadDistributor(this.zooKeeper, this.keySpace, this.beforeAction, workerAction);
