Skip to content
Snippets Groups Projects
Commit e03f4f86 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

FIX wrong package declaration, Dockerfile & gradle

parent ecc66533
No related branches found
No related tags found
1 merge request!1Add Implementations of Use Cases
FROM openjdk:11-slim FROM openjdk:11-slim
ADD build/distributions/exp-bigdata19-bridge.tar / ADD build/distributions/uc1-workload-generator.tar /
CMD export JAVA_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL \ CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
&& /exp-bigdata19-bridge/bin/exp-bigdata19-bridge /uc1-workload-generator/bin/uc1-workload-generator
\ No newline at end of file \ No newline at end of file
...@@ -21,7 +21,7 @@ dependencies { ...@@ -21,7 +21,7 @@ dependencies {
testCompile 'junit:junit:4.12' testCompile 'junit:junit:4.12'
} }
mainClassName = "titan.ccp.kiekerbridge.expbigdata19.ExperimentorBigData" mainClassName = "uc1.workloadGenerator.LoadGenerator"
eclipse { eclipse {
classpath { classpath {
......
package uc2.workloadGenerator; package uc1.workloadGenerator;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -12,39 +12,37 @@ import titan.ccp.configuration.events.EventSerde; ...@@ -12,39 +12,37 @@ import titan.ccp.configuration.events.EventSerde;
public class ConfigPublisher { public class ConfigPublisher {
private final String topic; private final String topic;
private final Producer<Event, String> producer; private final Producer<Event, String> producer;
public ConfigPublisher(final String bootstrapServers, final String topic) { public ConfigPublisher(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, new Properties()); this(bootstrapServers, topic, new Properties());
} }
public ConfigPublisher(final String bootstrapServers, final String topic, public ConfigPublisher(final String bootstrapServers, final String topic, final Properties defaultProperties) {
final Properties defaultProperties) { this.topic = topic;
this.topic = topic;
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.putAll(defaultProperties); properties.putAll(defaultProperties);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB
this.producer = this.producer = new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer());
new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); }
}
public void publish(final Event event, final String value) { public void publish(final Event event, final String value) {
final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value);
try { try {
this.producer.send(record).get(); this.producer.send(record).get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
} }
public void close() { public void close() {
this.producer.close(); this.producer.close();
} }
} }
package uc2.workloadGenerator; package uc1.workloadGenerator;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
......
package uc2.workloadGenerator; package uc1.workloadGenerator;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
...@@ -19,147 +19,129 @@ import titan.ccp.models.records.ActivePowerRecord; ...@@ -19,147 +19,129 @@ import titan.ccp.models.records.ActivePowerRecord;
public class LoadGeneratorExtrem { public class LoadGeneratorExtrem {
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups = final int numNestedGroups = Integer
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor = final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int value = final boolean sendRegistry = Boolean
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final boolean sendRegistry = final boolean doNothing = Boolean
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); .parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false"));
final boolean doNothing = final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); final int producers = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1"));
final int threads = final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); "localhost:9092");
final int producers = final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaBootstrapServers = final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); final SensorRegistry sensorRegistry = buildSensorRegistry(hierarchy, numNestedGroups, numSensor);
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); if (sendRegistry) {
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
final SensorRegistry sensorRegistry = configPublisher.close();
buildSensorRegistry(hierarchy, numNestedGroups, numSensor); System.out.println("Configuration sent.");
if (sendRegistry) { System.out.println("Now wait 30 seconds");
final ConfigPublisher configPublisher = Thread.sleep(30_000);
new ConfigPublisher(kafkaBootstrapServers, "configuration"); System.out.println("And woke up again :)");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); }
configPublisher.close();
System.out.println("Configuration sent."); final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
System.out.println("Now wait 30 seconds"); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
Thread.sleep(30_000); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
System.out.println("And woke up again :)"); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
} final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream
.<KafkaRecordSender<ActivePowerRecord>>generate(() -> new KafkaRecordSender<>(kafkaBootstrapServers,
final Properties kafkaProperties = new Properties(); kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties))
// kafkaProperties.put("acks", this.acknowledges); .limit(producers).collect(Collectors.toList());
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); .collect(Collectors.toList());
final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream
.<KafkaRecordSender<ActivePowerRecord>>generate( for (int i = 0; i < threads; i++) {
() -> new KafkaRecordSender<>( final int threadId = i;
kafkaBootstrapServers, new Thread(() -> {
kafkaInputTopic, while (true) {
r -> r.getIdentifier(), for (final String sensor : sensors) {
r -> r.getTimestamp(), if (!doNothing) {
kafkaProperties)) kafkaRecordSenders.get(threadId % producers)
.limit(producers) .write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
.collect(Collectors.toList()); }
}
final List<String> sensors = }
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) }).start();
.collect(Collectors.toList()); }
for (int i = 0; i < threads; i++) { while (true) {
final int threadId = i; printCpuUsagePerThread();
new Thread(() -> { }
while (true) {
for (final String sensor : sensors) { // System.out.println("Wait for termination...");
if (!doNothing) { // Thread.sleep(30 * 24 * 60 * 60 * 1000L);
kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord( // System.out.println("Will terminate now");
sensor, }
System.currentTimeMillis(),
value)); private static void printCpuUsagePerThread() throws InterruptedException {
} final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
} final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet());
}
}).start(); final long start = System.nanoTime();
} final long[] startCpuTimes = new long[threads.size()];
for (int i = 0; i < threads.size(); i++) {
while (true) { final Thread thread = threads.get(i);
printCpuUsagePerThread(); startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId());
} }
// System.out.println("Wait for termination..."); Thread.sleep(5000);
// Thread.sleep(30 * 24 * 60 * 60 * 1000L);
// System.out.println("Will terminate now"); for (int i = 0; i < threads.size(); i++) {
} final Thread thread = threads.get(i);
final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i];
private static void printCpuUsagePerThread() throws InterruptedException { final long dur = System.nanoTime() - start;
final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); final double util = (double) cpuTime / dur;
final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); System.out.println("Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util));
}
final long start = System.nanoTime(); }
final long[] startCpuTimes = new long[threads.size()];
for (int i = 0; i < threads.size(); i++) { private static SensorRegistry buildSensorRegistry(final String hierarchy, final int numNestedGroups,
final Thread thread = threads.get(i); final int numSensor) {
startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
} if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
Thread.sleep(5000); for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
for (int i = 0; i < threads.size(); i++) { }
final Thread thread = threads.get(i); for (int s = 0; s < numSensor; s++) {
final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; lastSensor.addChildMachineSensor("sensor_" + s);
final long dur = System.nanoTime() - start; }
final double util = (double) cpuTime / dur; } else if (hierarchy.equals("full")) {
System.out.println( addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
"Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); } else {
} throw new IllegalStateException();
} }
return sensorRegistry;
private static SensorRegistry buildSensorRegistry(final String hierarchy, }
final int numNestedGroups, final int numSensor) {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, final int lvl,
if (hierarchy.equals("deep")) { final int maxLvl, int nextId) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); for (int c = 0; c < numChildren; c++) {
for (int lvl = 1; lvl < numNestedGroups; lvl++) { if (lvl == maxLvl) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); parent.addChildMachineSensor("s_" + nextId);
} nextId++;
for (int s = 0; s < numSensor; s++) { } else {
lastSensor.addChildMachineSensor("sensor_" + s); final MutableAggregatedSensor newParent = parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
} nextId++;
} else if (hierarchy.equals("full")) { nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); }
} else { }
throw new IllegalStateException(); return nextId;
} }
return sensorRegistry;
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment