Skip to content
Snippets Groups Projects
Commit 76eea338 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

delete unused files, renaming

parent e56bde84
No related branches found
No related tags found
1 merge request!1Add Implementations of Use Cases
package titan.ccp.kiekerbridge;
package kafkaSender;
import java.util.Properties;
import java.util.function.Function;
......
package titan.ccp.kiekerbridge.expbigdata19;
package uc2.workloadGenerator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
......
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.Objects;
public class ExperimentorBigData {
public static void main(final String[] args) throws InterruptedException, IOException {
final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
if (modus.equals("LoadGenerator")) {
LoadGenerator.main(args);
} else if (modus.equals("LoadGeneratorExtrem")) {
LoadGeneratorExtrem.main(args);
} else if (modus.equals("LoadCounter")) {
LoadCounter.main(args);
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import com.google.common.math.StatsAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
public class LoadCounter {
public static void main(final String[] args) throws InterruptedException {
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaOutputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
final Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootstrapServers);
props.setProperty("group.id", "load-counter");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("max.poll.records", "1000000");
props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final Deserializer<AggregatedActivePowerRecord> deserializer =
IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
executor.scheduleAtFixedRate(
() -> {
final long time = System.currentTimeMillis();
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
long inputCount = 0;
for (final ConsumerRecord<String, byte[]> inputRecord : records
.records(kafkaInputTopic)) {
inputCount++;
}
long outputCount = 0;
final StatsAccumulator statsAccumulator = new StatsAccumulator();
for (final ConsumerRecord<String, byte[]> outputRecord : records
.records(kafkaOutputTopic)) {
outputCount++;
final AggregatedActivePowerRecord record =
deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
final long latency = time - record.getTimestamp();
statsAccumulator.add(latency);
}
final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
final long elapsedTime = System.currentTimeMillis() - time;
System.out
.println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
System.out
.println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
},
0,
1,
TimeUnit.SECONDS);
}
}
package titan.ccp.kiekerbridge.expbigdata19;
package uc2.workloadGenerator;
import java.io.IOException;
import java.util.List;
......@@ -9,116 +9,99 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(
() -> {
kafkaRecordSender.write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
},
initialDelay,
periodMs,
TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, final int lvl,
final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent = parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package titan.ccp.kiekerbridge.expbigdata19;
package uc2.workloadGenerator;
import java.io.IOException;
import java.lang.management.ManagementFactory;
......@@ -9,9 +9,9 @@ import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment