diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java index 9581b167c16484e7b7699434c0750ed7abba6e4b..305212b0416c644c9c23af461637b6fd0f549cf8 100644 --- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java @@ -24,14 +24,11 @@ public class LoadGenerator { private static final int WL_MAX_RECORDS = 150_000; public static void main(final String[] args) throws InterruptedException, IOException { - // uc1 LOGGER.info("Start workload generator for use case UC1."); final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int instanceId = getInstanceId(); - final int instances = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); diff --git a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java index e3b61453819b0c7de256318e50e0d7f858e3baba..35b9ffdf261276b4b409d093c5e02c7ada48d662 100644 --- a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java @@ -8,25 +8,27 @@ import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import kafkaSender.KafkaRecordSender; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final int WL_MAX_RECORDS = 150_000; + public static void main(final String[] args) throws InterruptedException, IOException { - // uc3 LOGGER.info("Start workload generator for use case UC3."); - final int numSensor = + final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); + final int instanceId = getInstanceId(); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); @@ -40,14 +42,12 @@ public class LoadGenerator { final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - // create sensorRegistry - // TODO replace as in UC1 or UC4 - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); + final int idStart = instanceId * WL_MAX_RECORDS; + final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors); + LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd); + final List<String> sensors = IntStream.range(idStart, idEnd) + .mapToObj(i -> "s_" + i) + .collect(Collectors.toList()); final Properties kafkaProperties = new Properties(); // kafkaProperties.put("acks", this.acknowledges); @@ -75,11 +75,16 @@ public class LoadGenerator { } - private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, - int nextId) { - for (int c = 0; c < numChildren; c++) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; + private static int getInstanceId() { + final String podName = System.getenv("POD_NAME"); + if (podName == null) { + return 0; + } else { + return Pattern.compile("-") + .splitAsStream(podName) + .reduce((p, x) -> x) + .map(Integer::parseInt) + .orElse(0); } }