From d80f620d4f07ad127f5c8477f69700992f582bbc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Tue, 5 May 2020 15:36:18 +0200
Subject: [PATCH] Add distributed load generation for UC3

---
 .../uc1/workloadGenerator/LoadGenerator.java  |  3 --
 .../uc3/workloadGenerator/LoadGenerator.java  | 39 +++++++++++--------
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
index 9581b167c..305212b04 100644
--- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
@@ -24,14 +24,11 @@ public class LoadGenerator {
   private static final int WL_MAX_RECORDS = 150_000;
 
   public static void main(final String[] args) throws InterruptedException, IOException {
-    // uc1
     LOGGER.info("Start workload generator for use case UC1.");
 
     final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int instanceId = getInstanceId();
-    final int instances =
-        Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
diff --git a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java
index e3b614538..35b9ffdf2 100644
--- a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java
+++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java
@@ -8,25 +8,27 @@ import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import kafkaSender.KafkaRecordSender;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
-import titan.ccp.model.sensorregistry.MutableSensorRegistry;
 import titan.ccp.models.records.ActivePowerRecord;
 
 public class LoadGenerator {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
 
+  private static final int WL_MAX_RECORDS = 150_000;
+
   public static void main(final String[] args) throws InterruptedException, IOException {
-    // uc3
     LOGGER.info("Start workload generator for use case UC3.");
 
-    final int numSensor =
+    final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
+    final int instanceId = getInstanceId();
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
@@ -40,14 +42,12 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
-    // create sensorRegistry
-    // TODO replace as in UC1 or UC4
-    final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
-    addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0);
-
-    final List<String> sensors =
-        sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
-            .collect(Collectors.toList());
+    final int idStart = instanceId * WL_MAX_RECORDS;
+    final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
+    LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
+    final List<String> sensors = IntStream.range(idStart, idEnd)
+        .mapToObj(i -> "s_" + i)
+        .collect(Collectors.toList());
 
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
@@ -75,11 +75,16 @@ public class LoadGenerator {
 
   }
 
-  private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren,
-      int nextId) {
-    for (int c = 0; c < numChildren; c++) {
-      parent.addChildMachineSensor("s_" + nextId);
-      nextId++;
+  private static int getInstanceId() {
+    final String podName = System.getenv("POD_NAME");
+    if (podName == null) {
+      return 0;
+    } else {
+      return Pattern.compile("-")
+          .splitAsStream(podName)
+          .reduce((p, x) -> x)
+          .map(Integer::parseInt)
+          .orElse(0);
     }
   }
 
-- 
GitLab