From e03f4f86aec668a19646ad5270681e8a050fba56 Mon Sep 17 00:00:00 2001 From: ben <stu126940@mail.uni-kiel.de> Date: Wed, 18 Mar 2020 11:38:25 +0100 Subject: [PATCH] FIX wrong package declaration, Dockerfile & gradle --- uc1-workload-generator/Dockerfile | 6 +- uc1-workload-generator/build.gradle | 2 +- .../workloadGenerator/ConfigPublisher.java | 54 ++-- .../uc1/workloadGenerator/LoadGenerator.java | 2 +- .../LoadGeneratorExtrem.java | 268 ++++++++---------- 5 files changed, 156 insertions(+), 176 deletions(-) diff --git a/uc1-workload-generator/Dockerfile b/uc1-workload-generator/Dockerfile index 9b17de3af..91f18d740 100644 --- a/uc1-workload-generator/Dockerfile +++ b/uc1-workload-generator/Dockerfile @@ -1,6 +1,6 @@ FROM openjdk:11-slim -ADD build/distributions/exp-bigdata19-bridge.tar / +ADD build/distributions/uc1-workload-generator.tar / -CMD export JAVA_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL \ - && /exp-bigdata19-bridge/bin/exp-bigdata19-bridge \ No newline at end of file +CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ + /uc1-workload-generator/bin/uc1-workload-generator \ No newline at end of file diff --git a/uc1-workload-generator/build.gradle b/uc1-workload-generator/build.gradle index 12e597b37..824566a24 100644 --- a/uc1-workload-generator/build.gradle +++ b/uc1-workload-generator/build.gradle @@ -21,7 +21,7 @@ dependencies { testCompile 'junit:junit:4.12' } -mainClassName = "titan.ccp.kiekerbridge.expbigdata19.ExperimentorBigData" +mainClassName = "uc1.workloadGenerator.LoadGenerator" eclipse { classpath { diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java index 56625e454..d0201b4da 100644 --- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java +++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package uc2.workloadGenerator; +package uc1.workloadGenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -12,39 +12,37 @@ import titan.ccp.configuration.events.EventSerde; public class ConfigPublisher { - private final String topic; + private final String topic; - private final Producer<Event, String> producer; + private final Producer<Event, String> producer; - public ConfigPublisher(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, new Properties()); - } + public ConfigPublisher(final String bootstrapServers, final String topic) { + this(bootstrapServers, topic, new Properties()); + } - public ConfigPublisher(final String bootstrapServers, final String topic, - final Properties defaultProperties) { - this.topic = topic; + public ConfigPublisher(final String bootstrapServers, final String topic, final Properties defaultProperties) { + this.topic = topic; - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB + final Properties properties = new Properties(); + properties.putAll(defaultProperties); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB + properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB - this.producer = - new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); - } + this.producer = new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); + } - public void publish(final Event event, final String value) { - final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); - try { - this.producer.send(record).get(); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalArgumentException(e); - } - } + public void publish(final Event event, final String value) { + final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); + try { + this.producer.send(record).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalArgumentException(e); + } + } - public void close() { - this.producer.close(); - } + public void close() { + this.producer.close(); + } } diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java index ccc2d689e..f75f8018b 100644 --- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGenerator.java @@ -1,4 +1,4 @@ -package uc2.workloadGenerator; +package uc1.workloadGenerator; import java.io.IOException; import java.util.List; diff --git a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java index e13030e23..1670778fd 100644 --- a/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java +++ b/uc1-workload-generator/src/main/java/uc1/workloadGenerator/LoadGeneratorExtrem.java @@ -1,4 +1,4 @@ -package uc2.workloadGenerator; +package uc1.workloadGenerator; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -19,147 +19,129 @@ import titan.ccp.models.records.ActivePowerRecord; public class LoadGeneratorExtrem { - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final boolean doNothing = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final int producers = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final SensorRegistry sensorRegistry = - buildSensorRegistry(hierarchy, numNestedGroups, numSensor); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream - .<KafkaRecordSender<ActivePowerRecord>>generate( - () -> new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties)) - .limit(producers) - .collect(Collectors.toList()); - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - for (int i = 0; i < threads; i++) { - final int threadId = i; - new Thread(() -> { - while (true) { - for (final String sensor : sensors) { - if (!doNothing) { - kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - } - } - } - }).start(); - } - - while (true) { - printCpuUsagePerThread(); - } - - // System.out.println("Wait for termination..."); - // Thread.sleep(30 * 24 * 60 * 60 * 1000L); - // System.out.println("Will terminate now"); - } - - private static void printCpuUsagePerThread() throws InterruptedException { - final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); - final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); - - final long start = System.nanoTime(); - final long[] startCpuTimes = new long[threads.size()]; - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); - } - - Thread.sleep(5000); - - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; - final long dur = System.nanoTime() - start; - final double util = (double) cpuTime / dur; - System.out.println( - "Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); - } - } - - private static SensorRegistry buildSensorRegistry(final String hierarchy, - final int numNestedGroups, final int numSensor) { - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - return sensorRegistry; - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } + public static void main(final String[] args) throws InterruptedException, IOException { + + final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); + final int numNestedGroups = Integer + .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); + final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); + final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final boolean sendRegistry = Boolean + .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); + final boolean doNothing = Boolean + .parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); + final int producers = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); + final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); + final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); + final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); + final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + + final SensorRegistry sensorRegistry = buildSensorRegistry(hierarchy, numNestedGroups, numSensor); + + if (sendRegistry) { + final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + System.out.println("Configuration sent."); + + System.out.println("Now wait 30 seconds"); + Thread.sleep(30_000); + System.out.println("And woke up again :)"); + } + + final Properties kafkaProperties = new Properties(); + // kafkaProperties.put("acks", this.acknowledges); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream + .<KafkaRecordSender<ActivePowerRecord>>generate(() -> new KafkaRecordSender<>(kafkaBootstrapServers, + kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties)) + .limit(producers).collect(Collectors.toList()); + + final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) + .collect(Collectors.toList()); + + for (int i = 0; i < threads; i++) { + final int threadId = i; + new Thread(() -> { + while (true) { + for (final String sensor : sensors) { + if (!doNothing) { + kafkaRecordSenders.get(threadId % producers) + .write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + } + } + } + }).start(); + } + + while (true) { + printCpuUsagePerThread(); + } + + // System.out.println("Wait for termination..."); + // Thread.sleep(30 * 24 * 60 * 60 * 1000L); + // System.out.println("Will terminate now"); + } + + private static void printCpuUsagePerThread() throws InterruptedException { + final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); + final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); + + final long start = System.nanoTime(); + final long[] startCpuTimes = new long[threads.size()]; + for (int i = 0; i < threads.size(); i++) { + final Thread thread = threads.get(i); + startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); + } + + Thread.sleep(5000); + + for (int i = 0; i < threads.size(); i++) { + final Thread thread = threads.get(i); + final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; + final long dur = System.nanoTime() - start; + final double util = (double) cpuTime / dur; + System.out.println("Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); + } + } + + private static SensorRegistry buildSensorRegistry(final String hierarchy, final int numNestedGroups, + final int numSensor) { + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + if (hierarchy.equals("deep")) { + MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); + for (int lvl = 1; lvl < numNestedGroups; lvl++) { + lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); + } + for (int s = 0; s < numSensor; s++) { + lastSensor.addChildMachineSensor("sensor_" + s); + } + } else if (hierarchy.equals("full")) { + addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); + } else { + throw new IllegalStateException(); + } + return sensorRegistry; + } + + private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, final int lvl, + final int maxLvl, int nextId) { + for (int c = 0; c < numChildren; c++) { + if (lvl == maxLvl) { + parent.addChildMachineSensor("s_" + nextId); + nextId++; + } else { + final MutableAggregatedSensor newParent = parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); + nextId++; + nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); + } + } + return nextId; + } } -- GitLab