From 7d5fef400c6f3c47a433b33585ebc20b2be62eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de> Date: Fri, 1 May 2020 16:25:29 +0200 Subject: [PATCH] Clean up --- .../uc3/workloadGenerator/LoadGenerator.java | 108 +++++++++--------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java index 35defc90a..659b958c8 100644 --- a/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/uc3/workloadGenerator/LoadGenerator.java @@ -11,77 +11,71 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import kafkaSender.KafkaRecordSender; import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { - public static void main(final String[] args) throws InterruptedException, IOException { - // uc1 + public static void main(final String[] args) throws InterruptedException, IOException { + // uc3 - final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = Boolean - .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), - "localhost:9092"); - final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int numSensor = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); + final int periodMs = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); + final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); + final String kafkaBootstrapServers = + Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final String kafkaInputTopic = + Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); + final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); + final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); + final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - // create sensorRegistry - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); + // create sensorRegistry + // TODO replace as in UC1 or UC4 + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); - final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); + final List<String> sensors = + sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) + .collect(Collectors.toList()); - if (sendRegistry) { - final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); + final Properties kafkaProperties = new Properties(); + // kafkaProperties.put("acks", this.acknowledges); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = + new KafkaRecordSender<>(kafkaBootstrapServers, + kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers, - kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + for (final String sensor : sensors) { + System.out.println("working"); + final int initialDelay = random.nextInt(periodMs); + executor.scheduleAtFixedRate(() -> { + kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + }, initialDelay, periodMs, TimeUnit.MILLISECONDS); + } - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); + System.out.println("Wait for termination..."); + executor.awaitTermination(30, TimeUnit.DAYS); + System.out.println("Will terminate now"); - for (final String sensor : sensors) { - System.out.println("working"); - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); - } + } - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - - } - - private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) { - for (int c = 0; c < numChildren; c++) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } - } + private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, + int nextId) { + for (int c = 0; c < numChildren; c++) { + parent.addChildMachineSensor("s_" + nextId); + nextId++; + } + } } -- GitLab