Skip to content
Snippets Groups Projects
Commit 6ea94977 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

copy WG of uc1 to uc4

parent 7e779666
Branches
Tags
1 merge request!1Add Implementations of Use Cases
package titan.ccp.kiekerbridge; package kafkaSender;
import java.util.Properties; import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
......
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.Objects;
public class ExperimentorBigData {
public static void main(final String[] args) throws InterruptedException, IOException {
final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
if (modus.equals("LoadGenerator")) {
LoadGenerator.main(args);
} else if (modus.equals("LoadGeneratorExtrem")) {
LoadGeneratorExtrem.main(args);
} else if (modus.equals("LoadCounter")) {
LoadCounter.main(args);
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import com.google.common.math.StatsAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
public class LoadCounter {
public static void main(final String[] args) throws InterruptedException {
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaOutputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
final Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootstrapServers);
props.setProperty("group.id", "load-counter");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("max.poll.records", "1000000");
props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final Deserializer<AggregatedActivePowerRecord> deserializer =
IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
executor.scheduleAtFixedRate(
() -> {
final long time = System.currentTimeMillis();
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
long inputCount = 0;
for (final ConsumerRecord<String, byte[]> inputRecord : records
.records(kafkaInputTopic)) {
inputCount++;
}
long outputCount = 0;
final StatsAccumulator statsAccumulator = new StatsAccumulator();
for (final ConsumerRecord<String, byte[]> outputRecord : records
.records(kafkaOutputTopic)) {
outputCount++;
final AggregatedActivePowerRecord record =
deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
final long latency = time - record.getTimestamp();
statsAccumulator.add(latency);
}
final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
final long elapsedTime = System.currentTimeMillis() - time;
System.out
.println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
System.out
.println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
},
0,
1,
TimeUnit.SECONDS);
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(
() -> {
kafkaRecordSender.write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
},
initialDelay,
periodMs,
TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package titan.ccp.kiekerbridge.expbigdata19; package uc4.workloadGenerator;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
......
package uc4.workloadGenerator;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create sensorRegistry
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0);
final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
// TODO Brauchen wir das ?
if (sendRegistry) {
final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) {
for (int c = 0; c < numChildren; c++) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
}
}
}
package titan.ccp.kiekerbridge.expbigdata19; package uc4.workloadGenerator;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
...@@ -9,9 +9,9 @@ import java.util.Objects; ...@@ -9,9 +9,9 @@ import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment