Skip to content
Snippets Groups Projects
Commit 00d69d4f authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Replace custom Duration with java.time.Duration

parent 12542308
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing
with 27 additions and 213 deletions
package theodolite.uc1.workloadgenerator; package theodolite.uc1.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; ...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
...@@ -23,7 +24,7 @@ public final class LoadGenerator { ...@@ -23,7 +24,7 @@ public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final int MAX_DURATION_IN_DAYS = 30; private static final long MAX_DURATION_IN_DAYS = 30L;
private LoadGenerator() {} private LoadGenerator() {}
...@@ -69,7 +70,7 @@ public final class LoadGenerator { ...@@ -69,7 +70,7 @@ public final class LoadGenerator {
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(MAX_DURATION_IN_DAYS, TimeUnit.DAYS)) .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction( .setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
......
package theodolite.uc2.workloadgenerator; package theodolite.uc2.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; ...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
...@@ -23,6 +24,8 @@ public class LoadGenerator { ...@@ -23,6 +24,8 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final long MAX_DURATION_IN_DAYS = 30L;
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc2 // uc2
LOGGER.info("Start workload generator for use case UC2."); LOGGER.info("Start workload generator for use case UC2.");
...@@ -71,7 +74,7 @@ public class LoadGenerator { ...@@ -71,7 +74,7 @@ public class LoadGenerator {
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS)) .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setBeforeAction(() -> { .setBeforeAction(() -> {
if (sendRegistry) { if (sendRegistry) {
final ConfigPublisher configPublisher = final ConfigPublisher configPublisher =
......
package theodolite.uc2.workloadgenerator;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerConfig;
import theodolite.kafkasender.KafkaRecordSender;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGeneratorExtrem {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final boolean doNothing =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final int producers =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final SensorRegistry sensorRegistry =
buildSensorRegistry(hierarchy, numNestedGroups, numSensor);
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream
.<KafkaRecordSender<ActivePowerRecord>>generate(
() -> new KafkaRecordSender<>(
kafkaBootstrapServers,
kafkaInputTopic,
r -> r.getIdentifier(),
r -> r.getTimestamp(),
kafkaProperties))
.limit(producers)
.collect(Collectors.toList());
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
for (int i = 0; i < threads; i++) {
final int threadId = i;
new Thread(() -> {
while (true) {
for (final String sensor : sensors) {
if (!doNothing) {
kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
}
}
}
}).start();
}
while (true) {
printCpuUsagePerThread();
}
// System.out.println("Wait for termination...");
// Thread.sleep(30 * 24 * 60 * 60 * 1000L);
// System.out.println("Will terminate now");
}
private static void printCpuUsagePerThread() throws InterruptedException {
final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet());
final long start = System.nanoTime();
final long[] startCpuTimes = new long[threads.size()];
for (int i = 0; i < threads.size(); i++) {
final Thread thread = threads.get(i);
startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId());
}
Thread.sleep(5000);
for (int i = 0; i < threads.size(); i++) {
final Thread thread = threads.get(i);
final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i];
final long dur = System.nanoTime() - start;
final double util = (double) cpuTime / dur;
System.out.println(
"Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util));
}
}
private static SensorRegistry buildSensorRegistry(final String hierarchy,
final int numNestedGroups, final int numSensor) {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
return sensorRegistry;
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package theodolite.uc3.workloadgenerator; package theodolite.uc3.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; ...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
...@@ -20,6 +21,8 @@ public class LoadGenerator { ...@@ -20,6 +21,8 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final long MAX_DURATION_IN_DAYS = 30L;
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc2 // uc2
LOGGER.info("Start workload generator for use case UC3."); LOGGER.info("Start workload generator for use case UC3.");
...@@ -59,7 +62,7 @@ public class LoadGenerator { ...@@ -59,7 +62,7 @@ public class LoadGenerator {
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS)) .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction( .setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
......
package theodolite.uc4.workloadgenerator; package theodolite.uc4.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; ...@@ -8,7 +10,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
...@@ -20,6 +21,8 @@ public class LoadGenerator { ...@@ -20,6 +21,8 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final long MAX_DURATION_IN_DAYS = 30L;
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc4 // uc4
LOGGER.info("Start workload generator for use case UC4."); LOGGER.info("Start workload generator for use case UC4.");
...@@ -59,7 +62,7 @@ public class LoadGenerator { ...@@ -59,7 +62,7 @@ public class LoadGenerator {
.setKeySpace(new KeySpace("s_", numSensors)) .setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS)) .setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS)) .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction( .setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
......
package theodolite.commons.workloadgeneration.dimensions;
import java.util.concurrent.TimeUnit;
import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator;
/**
* Wrapper class for the definition of the duration for the {@link AbstractWorkloadGenerator}.
*/
public class Duration {
private final int duration;
private final TimeUnit timeUnit;
/**
* Define a new duration.
*
* @param duration the duration
* @param timeUnit the time unit that applies to the specified {@code duration}
*/
public Duration(final int duration, final TimeUnit timeUnit) {
super();
this.duration = duration;
this.timeUnit = timeUnit;
}
public int getDuration() {
return this.duration;
}
public TimeUnit getTimeUnit() {
return this.timeUnit;
}
}
package theodolite.commons.workloadgeneration.generators; package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction; import theodolite.commons.workloadgeneration.functions.BeforeAction;
...@@ -112,8 +113,9 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> ...@@ -112,8 +113,9 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
periodMs, period.getTimeUnit()); periodMs, period.getTimeUnit());
}); });
try { try {
this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS);
LOGGER.info("Terminating now..."); LOGGER.info("Terminating now...");
this.stop(); this.stop();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
......
package theodolite.commons.workloadgeneration.generators; package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction; import theodolite.commons.workloadgeneration.functions.BeforeAction;
...@@ -12,7 +12,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; ...@@ -12,7 +12,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
/** /**
* Workload generator for generating load for the kafka messaging system. * Workload generator for generating load for the kafka messaging system.
*/ */
public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends AbstractWorkloadGenerator<T> { public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
extends AbstractWorkloadGenerator<T> {
private final KafkaRecordSender<T> recordSender; private final KafkaRecordSender<T> recordSender;
......
package theodolite.commons.workloadgeneration.generators; package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.Duration;
import theodolite.commons.workloadgeneration.dimensions.KeySpace; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period; import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction; import theodolite.commons.workloadgeneration.functions.BeforeAction;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment