diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index a3898a0f2b3abcd27aec9fac2784579002131937..d9bbbe47d309912a3d2eb38489b0789fd625888d 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -1,6 +1,8 @@ package theodolite.uc1.workloadgenerator; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; @@ -23,7 +24,7 @@ public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private static final int MAX_DURATION_IN_DAYS = 30; + private static final long MAX_DURATION_IN_DAYS = 30L; private LoadGenerator() {} @@ -69,7 +70,7 @@ public final class LoadGenerator { .setKeySpace(new KeySpace("s_", numSensors)) .setThreads(threads) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) - .setDuration(new Duration(MAX_DURATION_IN_DAYS, TimeUnit.DAYS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) .setGeneratorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index add72c61c2df712d90f9140805d973dfc4d42422..60f20518f6257e85bfe4bf6edd0e68224da0eb5f 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,6 +1,8 @@ package theodolite.uc2.workloadgenerator; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; @@ -23,6 +24,8 @@ public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final long MAX_DURATION_IN_DAYS = 30L; + public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC2."); @@ -71,7 +74,7 @@ public class LoadGenerator { .setKeySpace(new KeySpace("s_", numSensors)) .setThreads(threads) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) - .setDuration(new Duration(30, TimeUnit.DAYS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) .setBeforeAction(() -> { if (sendRegistry) { final ConfigPublisher configPublisher = diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java deleted file mode 100644 index 1e58541758602cd2b1ea84f3ac3360aa3911425d..0000000000000000000000000000000000000000 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java +++ /dev/null @@ -1,165 +0,0 @@ -package theodolite.uc2.workloadgenerator; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.kafka.clients.producer.ProducerConfig; -import theodolite.kafkasender.KafkaRecordSender; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGeneratorExtrem { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final boolean doNothing = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final int producers = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final SensorRegistry sensorRegistry = - buildSensorRegistry(hierarchy, numNestedGroups, numSensor); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream - .<KafkaRecordSender<ActivePowerRecord>>generate( - () -> new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties)) - .limit(producers) - .collect(Collectors.toList()); - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - for (int i = 0; i < threads; i++) { - final int threadId = i; - new Thread(() -> { - while (true) { - for (final String sensor : sensors) { - if (!doNothing) { - kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - } - } - } - }).start(); - } - - while (true) { - printCpuUsagePerThread(); - } - - // System.out.println("Wait for termination..."); - // Thread.sleep(30 * 24 * 60 * 60 * 1000L); - // System.out.println("Will terminate now"); - } - - private static void printCpuUsagePerThread() throws InterruptedException { - final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); - final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); - - final long start = System.nanoTime(); - final long[] startCpuTimes = new long[threads.size()]; - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); - } - - Thread.sleep(5000); - - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; - final long dur = System.nanoTime() - start; - final double util = (double) cpuTime / dur; - System.out.println( - "Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); - } - } - - private static SensorRegistry buildSensorRegistry(final String hierarchy, - final int numNestedGroups, final int numSensor) { - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - return sensorRegistry; - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index c1152686aaa01697ae389a5ecc32dd97fe545ef5..6e8e2d897895989c7eb88ebe970a59849911a679 100644 --- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -1,6 +1,8 @@ package theodolite.uc3.workloadgenerator; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; @@ -20,6 +21,8 @@ public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final long MAX_DURATION_IN_DAYS = 30L; + public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC3."); @@ -59,7 +62,7 @@ public class LoadGenerator { .setKeySpace(new KeySpace("s_", numSensors)) .setThreads(threads) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) - .setDuration(new Duration(30, TimeUnit.DAYS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) .setGeneratorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 6eb99410f68f5b549b7bfe08bc3d0e505c24553a..89a3db55aca7305ed7677f11950dcc056814c5b3 100644 --- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,6 +1,8 @@ package theodolite.uc4.workloadgenerator; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; @@ -20,6 +21,8 @@ public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final long MAX_DURATION_IN_DAYS = 30L; + public static void main(final String[] args) throws InterruptedException, IOException { // uc4 LOGGER.info("Start workload generator for use case UC4."); @@ -59,7 +62,7 @@ public class LoadGenerator { .setKeySpace(new KeySpace("s_", numSensors)) .setThreads(threads) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) - .setDuration(new Duration(30, TimeUnit.DAYS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) .setGeneratorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java deleted file mode 100644 index d4652813cf9a06488e400fe37ddf16a63fd4bab6..0000000000000000000000000000000000000000 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java +++ /dev/null @@ -1,34 +0,0 @@ -package theodolite.commons.workloadgeneration.dimensions; - -import java.util.concurrent.TimeUnit; -import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; - -/** - * Wrapper class for the definition of the duration for the {@link AbstractWorkloadGenerator}. - */ -public class Duration { - - private final int duration; - private final TimeUnit timeUnit; - - /** - * Define a new duration. - * - * @param duration the duration - * @param timeUnit the time unit that applies to the specified {@code duration} - */ - public Duration(final int duration, final TimeUnit timeUnit) { - super(); - this.duration = duration; - this.timeUnit = timeUnit; - } - - public int getDuration() { - return this.duration; - } - - public TimeUnit getTimeUnit() { - return this.timeUnit; - } - -} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index abb0b7eadd134ab4bbcbc7fda202efb123a24a73..a53717dc2f1e708ccc589c119caab91bc61a81be 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -1,17 +1,18 @@ package theodolite.commons.workloadgeneration.generators; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import kieker.common.record.IMonitoringRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -112,8 +113,9 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> periodMs, period.getTimeUnit()); }); + try { - this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); + this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS); LOGGER.info("Terminating now..."); this.stop(); } catch (final InterruptedException e) { diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 3c9955c6097d870d3069a3568afcfcb6582aa8c2..3e16755746b43921d79d5092047bb996a19564c5 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -1,8 +1,8 @@ package theodolite.commons.workloadgeneration.generators; +import java.time.Duration; import kieker.common.record.IMonitoringRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.functions.BeforeAction; @@ -12,7 +12,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; /** * Workload generator for generating load for the kafka messaging system. */ -public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends AbstractWorkloadGenerator<T> { +public class KafkaWorkloadGenerator<T extends IMonitoringRecord> + extends AbstractWorkloadGenerator<T> { private final KafkaRecordSender<T> recordSender; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index c76c53499fbcbc569d7526f3e6fe617ad25561e9..1cd71b5bb0f38cd14eb5f147daa5c7dae6af4732 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -1,9 +1,9 @@ package theodolite.commons.workloadgeneration.generators; +import java.time.Duration; import java.util.Objects; import kieker.common.record.IMonitoringRecord; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; -import theodolite.commons.workloadgeneration.dimensions.Duration; import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.functions.BeforeAction;