diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java b/uc4-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java similarity index 98% rename from uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java rename to uc4-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java index b46128c8ebfd52aeeb7127777bb6530761f35181..6c67cf722b4dce87f0bc197ba80f8f117f82198e 100644 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java +++ b/uc4-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge; +package kafkaSender; import java.util.Properties; import java.util.function.Function; diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java b/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java deleted file mode 100644 index a50bbd942fccf5f8899414fe8cb7b82ad6953f87..0000000000000000000000000000000000000000 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java +++ /dev/null @@ -1,21 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.Objects; - -public class ExperimentorBigData { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter"); - - if (modus.equals("LoadGenerator")) { - LoadGenerator.main(args); - } else if (modus.equals("LoadGeneratorExtrem")) { - LoadGeneratorExtrem.main(args); - } else if (modus.equals("LoadCounter")) { - LoadCounter.main(args); - } - - } -} diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java b/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java deleted file mode 100644 index 798f3014446605afab2cf20f3232896baab02802..0000000000000000000000000000000000000000 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java +++ /dev/null @@ -1,84 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import com.google.common.math.StatsAccumulator; -import java.time.Duration; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; - -public class LoadCounter { - - public static void main(final String[] args) throws InterruptedException { - - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaOutputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output"); - - final Properties props = new Properties(); - props.setProperty("bootstrap.servers", kafkaBootstrapServers); - props.setProperty("group.id", "load-counter"); - props.setProperty("enable.auto.commit", "true"); - props.setProperty("auto.commit.interval.ms", "1000"); - props.setProperty("max.poll.records", "1000000"); - props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB - props.setProperty("key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer"); - props.setProperty("value.deserializer", - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - final Deserializer<AggregatedActivePowerRecord> deserializer = - IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory()); - - final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); - consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic)); - - executor.scheduleAtFixedRate( - () -> { - final long time = System.currentTimeMillis(); - final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500)); - - long inputCount = 0; - for (final ConsumerRecord<String, byte[]> inputRecord : records - .records(kafkaInputTopic)) { - inputCount++; - } - - long outputCount = 0; - final StatsAccumulator statsAccumulator = new StatsAccumulator(); - for (final ConsumerRecord<String, byte[]> outputRecord : records - .records(kafkaOutputTopic)) { - outputCount++; - final AggregatedActivePowerRecord record = - deserializer.deserialize(kafkaOutputTopic, outputRecord.value()); - final long latency = time - record.getTimestamp(); - statsAccumulator.add(latency); - } - - final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0; - - final long elapsedTime = System.currentTimeMillis() - time; - System.out - .println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount); - System.out - .println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount); - }, - 0, - 1, - TimeUnit.SECONDS); - } - -} diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java b/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java deleted file mode 100644 index 97a7c84f872f3ab676128d903ae121c376bf7608..0000000000000000000000000000000000000000 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java +++ /dev/null @@ -1,124 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGenerator { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( - kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), - kafkaProperties); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate( - () -> { - kafkaRecordSender.write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - }, - initialDelay, - periodMs, - TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/ConfigPublisher.java similarity index 97% rename from uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java rename to uc4-workload-generator/src/main/java/uc4/workloadGenerator/ConfigPublisher.java index d5f55a4ab7ca265b241e880363975070e9952c45..b126668818780caca1ea7c3c63b2203813130e9b 100644 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java +++ b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge.expbigdata19; +package uc4.workloadGenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; diff --git a/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..039687e0211375f206951c41a054c76e661407f8 --- /dev/null +++ b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGenerator.java @@ -0,0 +1,87 @@ +package uc4.workloadGenerator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import kafkaSender.KafkaRecordSender; +import org.apache.kafka.clients.producer.ProducerConfig; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.sensorregistry.MutableAggregatedSensor; +import titan.ccp.model.sensorregistry.MutableSensorRegistry; +import titan.ccp.models.records.ActivePowerRecord; + +public class LoadGenerator { + + public static void main(final String[] args) throws InterruptedException, IOException { + // uc1 + + final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); + final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); + final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); + final boolean sendRegistry = Boolean + .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); + final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), + "localhost:9092"); + final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); + final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); + final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); + final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + + // create sensorRegistry + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0); + + final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) + .collect(Collectors.toList()); + + // TODO Brauchen wir das ? + if (sendRegistry) { + final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + System.out.println("Configuration sent."); + + System.out.println("Now wait 30 seconds"); + Thread.sleep(30_000); + System.out.println("And woke up again :)"); + } + + final Properties kafkaProperties = new Properties(); + // kafkaProperties.put("acks", this.acknowledges); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); + final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers, + kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + for (final String sensor : sensors) { + final int initialDelay = random.nextInt(periodMs); + executor.scheduleAtFixedRate(() -> { + kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); + }, initialDelay, periodMs, TimeUnit.MILLISECONDS); + } + + System.out.println("Wait for termination..."); + executor.awaitTermination(30, TimeUnit.DAYS); + System.out.println("Will terminate now"); + + } + + private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) { + for (int c = 0; c < numChildren; c++) { + parent.addChildMachineSensor("s_" + nextId); + nextId++; + } + } + +} diff --git a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGeneratorExtrem.java similarity index 98% rename from uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java rename to uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGeneratorExtrem.java index 5bfb6ad488e90f39ded2b9e4cb57d10099f1c538..a864a0f333d9097eece8f4e93440e377500cef84 100644 --- a/uc4-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java +++ b/uc4-workload-generator/src/main/java/uc4/workloadGenerator/LoadGeneratorExtrem.java @@ -1,4 +1,4 @@ -package titan.ccp.kiekerbridge.expbigdata19; +package uc4.workloadGenerator; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -9,9 +9,9 @@ import java.util.Objects; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; +import kafkaSender.KafkaRecordSender; import org.apache.kafka.clients.producer.ProducerConfig; import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;