diff --git a/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java index 039687e0211375f206951c41a054c76e661407f8..b7685a0815a9c6fe1d39facedcf06f6887063312 100644 --- a/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java @@ -9,79 +9,57 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import kafkaSender.KafkaRecordSender; import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { - public static void main(final String[] args) throws InterruptedException, IOException { - // uc1 + public static void main(final String[] args) throws InterruptedException, IOException { + // uc4 - final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = Boolean - .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int numSensor = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); + final int periodMs = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); + final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1")); + final String kafkaBootstrapServers = + Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final String kafkaInputTopic = + Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); + final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); + final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); + final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - // create sensorRegistry - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); + final Properties kafkaProperties = new Properties(); + // kafkaProperties.put("acks", this.acknowledges); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = + new KafkaRecordSender<>(kafkaBootstrapServers, + kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); - final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); - // TODO Brauchen wir das ? - if (sendRegistry) { - final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); + final List<String> sensors = + IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList()); - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } + for (final String sensor : sensors) { + final int initialDelay = random.nextInt(periodMs); + executor.scheduleAtFixedRate(() -> { + kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + }, initialDelay, periodMs, TimeUnit.MILLISECONDS); + } - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + System.out.println("Wait for termination..."); + executor.awaitTermination(30, TimeUnit.DAYS); + System.out.println("Will terminate now"); - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - - } - - private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) { - for (int c = 0; c < numChildren; c++) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } - } + } }