From b86ba38125625d7ef8cab99dbb71d0b16ee20a96 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Fri, 1 May 2020 16:11:43 +0200
Subject: [PATCH] Add basic support for multiple load generators in UC1

---
 .../uc1/workloadGenerator/LoadGenerator.java  | 42 ++++++++++++++++---
 1 file changed, 36 insertions(+), 6 deletions(-)

diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
index 68bf0faf6..7597a595d 100644
--- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java
@@ -8,19 +8,29 @@ import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import kafkaSender.KafkaRecordSender;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import titan.ccp.models.records.ActivePowerRecord;
 
 public class LoadGenerator {
 
+  private static Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
+
+  private static final int WL_MAX_RECORDS = 150_000;
+
   public static void main(final String[] args) throws InterruptedException, IOException {
     // uc1
 
-    final int numSensor =
+    final int numSensors =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
+    final int instanceId = getInstanceId();
+    final int instances =
+        Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
     final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
@@ -33,17 +43,24 @@ public class LoadGenerator {
     final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
     final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
 
-    final List<String> sensors =
-        IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList());
+    final int idStart = instanceId * WL_MAX_RECORDS;
+    final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
+    LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
+    final List<String> sensors = IntStream.range(idStart, idEnd)
+        .mapToObj(i -> "s_" + i)
+        .collect(Collectors.toList());
 
     final Properties kafkaProperties = new Properties();
     // kafkaProperties.put("acks", this.acknowledges);
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
-    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
-        new KafkaRecordSender<>(kafkaBootstrapServers,
-            kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
+    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
+        kafkaBootstrapServers,
+        kafkaInputTopic,
+        r -> r.getIdentifier(),
+        r -> r.getTimestamp(),
+        kafkaProperties);
 
     final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
     final Random random = new Random();
@@ -61,4 +78,17 @@ public class LoadGenerator {
 
   }
 
+  private static int getInstanceId() {
+    final String podName = System.getenv("POD_NAME");
+    if (podName == null) {
+      return 0;
+    } else {
+      return Pattern.compile("-")
+          .splitAsStream(podName)
+          .reduce((p, x) -> x)
+          .map(Integer::parseInt)
+          .orElse(0);
+    }
+  }
+
 }
-- 
GitLab