diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..4e04e2891754324a6e1bf55348b6a38f592bb301 100644 --- a/.settings/org.eclipse.jdt.ui.prefs +++ b/.settings/org.eclipse.jdt.ui.prefs @@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true sp_cleanup.qualify_static_method_accesses_with_declaring_class=false sp_cleanup.remove_private_constructors=true sp_cleanup.remove_redundant_modifiers=false -sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_semicolons=true sp_cleanup.remove_redundant_type_arguments=true sp_cleanup.remove_trailing_whitespaces=true sp_cleanup.remove_trailing_whitespaces_all=true diff --git a/build.gradle b/build.gradle index 378db78373409b7f532e70f2e5e01cf0085a9f5f..99bd2d2c895065792d99ccd084e8957994e36726 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,9 @@ configure(useCaseProjects) { implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' implementation project(':application-kafkastreams-commons') + + // These dependencies are used for the workload-generator-commmon + implementation project(':workload-generator-commons') // Use JUnit test framework testImplementation 'junit:junit:4.12' @@ -77,6 +80,8 @@ configure(commonProjects) { dependencies { // These dependencies is exported to consumers, that is to say found on their compile classpath. api 'org.apache.kafka:kafka-clients:2.4.0' + api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } + api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' diff --git a/execution/run_uc1-new.sh b/execution/run_uc1-new.sh index 564e03a470723f2b4564ccf96d31b66fa7dd7d2f..04eb86edc9bb5653f3281793bf48655bca643391 100755 --- a/execution/run_uc1-new.sh +++ b/execution/run_uc1-new.sh @@ -85,6 +85,33 @@ do done echo "Finish topic deletion, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' + +# delete zookeeper nodes used for workload generation +echo "Delete ZooKeeper configurations used for workload generation" +kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" +echo "Waiting for deletion" + +while [ true ] +do + IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') + found=0 + for element in "${array[@]}" + do + if [ "$element" == "workload-generation" ]; then + found=1 + break + fi + done + if [ $found -ne 1 ]; then + echo "ZooKeeper reset was successful." + break + else + echo "ZooKeeper reset was not successful. Retrying in 5s." + sleep 5s + fi +done +echo "Deletion finished" + echo "Exiting script" KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}") diff --git a/execution/run_uc2-new.sh b/execution/run_uc2-new.sh index aca65894b5d791eb20fd97b9bc9ab279f693eda7..c0bbf313b1f598ec3b4107fdc4ebb257c4eb7c21 100755 --- a/execution/run_uc2-new.sh +++ b/execution/run_uc2-new.sh @@ -78,6 +78,33 @@ do done echo "Finish topic deletion, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' + +# delete zookeeper nodes used for workload generation +echo "Delete ZooKeeper configurations used for workload generation" +kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" +echo "Waiting for deletion" + +while [ true ] +do + IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') + found=0 + for element in "${array[@]}" + do + if [ "$element" == "workload-generation" ]; then + found=1 + break + fi + done + if [ $found -ne 1 ]; then + echo "ZooKeeper reset was successful." + break + else + echo "ZooKeeper reset was not successful. Retrying in 5s." + sleep 5s + fi +done +echo "Deletion finished" + echo "Exiting script" KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}") diff --git a/execution/run_uc3-new.sh b/execution/run_uc3-new.sh index 79500eb508e39d9460c965494a4b7d0b34b6585a..f214e20b3af93b0f89d76d6ea50ce3d7cd428ded 100755 --- a/execution/run_uc3-new.sh +++ b/execution/run_uc3-new.sh @@ -86,6 +86,33 @@ do done echo "Finish topic deletion, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' + +# delete zookeeper nodes used for workload generation +echo "Delete ZooKeeper configurations used for workload generation" +kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" +echo "Waiting for deletion" + +while [ true ] +do + IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') + found=0 + for element in "${array[@]}" + do + if [ "$element" == "workload-generation" ]; then + found=1 + break + fi + done + if [ $found -ne 1 ]; then + echo "ZooKeeper reset was successful." + break + else + echo "ZooKeeper reset was not successful. Retrying in 5s." + sleep 5s + fi +done +echo "Deletion finished" + echo "Exiting script" KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}") diff --git a/execution/run_uc4-new.sh b/execution/run_uc4-new.sh index 664d866f88d894eda37a30a72875151f1d545e98..2a3936b711ce4115ab9cf0c484a7fc57cecd1800 100755 --- a/execution/run_uc4-new.sh +++ b/execution/run_uc4-new.sh @@ -81,6 +81,33 @@ do done echo "Finish topic deletion, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' + +# delete zookeeper nodes used for workload generation +echo "Delete ZooKeeper configurations used for workload generation" +kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" +echo "Waiting for deletion" + +while [ true ] +do + IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') + found=0 + for element in "${array[@]}" + do + if [ "$element" == "workload-generation" ]; then + found=1 + break + fi + done + if [ $found -ne 1 ]; then + echo "ZooKeeper reset was successful." + break + else + echo "ZooKeeper reset was not successful. Retrying in 5s." + sleep 5s + fi +done +echo "Deletion finished" + echo "Exiting script" KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}") diff --git a/execution/uc1-workload-generator/deployment.yaml b/execution/uc1-workload-generator/deployment.yaml index a0fde4bbf9765b2bb56bd36acde430d97169f34b..f79d763068f3f70401acd11263b598415ed7781d 100644 --- a/execution/uc1-workload-generator/deployment.yaml +++ b/execution/uc1-workload-generator/deployment.yaml @@ -18,6 +18,10 @@ spec: - name: workload-generator image: soerenhenning/uc1-wg:latest env: + - name: ZK_HOST + value: "my-confluent-cp-zookeeper" + - name: ZK_PORT + value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: NUM_SENSORS diff --git a/execution/uc2-workload-generator/deployment.yaml b/execution/uc2-workload-generator/deployment.yaml index 52592626f2a6bf93415c29f5bb4f020b527a5899..416802004dcb69833765e59a19ca8337110e666a 100644 --- a/execution/uc2-workload-generator/deployment.yaml +++ b/execution/uc2-workload-generator/deployment.yaml @@ -17,6 +17,10 @@ spec: - name: workload-generator image: benediktwetzel/uc2-wg:latest env: + - name: ZK_HOST + value: "my-confluent-cp-zookeeper" + - name: ZK_PORT + value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: HIERARCHY diff --git a/execution/uc3-workload-generator/deployment.yaml b/execution/uc3-workload-generator/deployment.yaml index 9ecd2b67e757c94221e36edcfcfd43c22782270a..2171c31a81d64765f65d5b76a2137d1151f63859 100644 --- a/execution/uc3-workload-generator/deployment.yaml +++ b/execution/uc3-workload-generator/deployment.yaml @@ -18,6 +18,10 @@ spec: - name: workload-generator image: soerenhenning/uc3-wg:latest env: + - name: ZK_HOST + value: "my-confluent-cp-zookeeper" + - name: ZK_PORT + value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: NUM_SENSORS diff --git a/execution/uc4-workload-generator/deployment.yaml b/execution/uc4-workload-generator/deployment.yaml index 6400abc345dcfb902364d3225bc6eb174380eb8b..516051b2a574a8307f20f551e2be87e753f21658 100644 --- a/execution/uc4-workload-generator/deployment.yaml +++ b/execution/uc4-workload-generator/deployment.yaml @@ -17,6 +17,10 @@ spec: - name: workload-generator image: soerenhenning/uc4-wg:latest env: + - name: ZK_HOST + value: "my-confluent-cp-zookeeper" + - name: ZK_PORT + value: "2181" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: NUM_SENSORS diff --git a/settings.gradle b/settings.gradle index 51112256b1a124d07ad80caf7ac0ccaf697858d3..9104525ce160a25957f9731f820a723b4f36f7d5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,6 @@ rootProject.name = 'scalability-benchmarking' +include 'workload-generator-commons' include 'application-kafkastreams-commons' include 'uc1-workload-generator' diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index bcff74b9a5a4efc72ce1f206f5f10c13557eafd7..381aef34dd42cc3cac9908480719a98fc55f3a27 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -1,38 +1,49 @@ package theodolite.uc1.workloadgenerator; import java.io.IOException; -import java.util.List; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.kafkasender.KafkaRecordSender; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.models.records.ActivePowerRecord; -public class LoadGenerator { +/** + * Load Generator for UC1. + */ +public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private static final int WL_MAX_RECORDS = 150_000; + private static final long MAX_DURATION_IN_DAYS = 30L; + private LoadGenerator() {} + + /** + * Entry point. + */ public static void main(final String[] args) throws InterruptedException, IOException { + // uc1 LOGGER.info("Start workload generator for use case UC1."); + // get environment variables + final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); + final int zooKeeperPort = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int instanceId = getInstanceId(); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); + final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), + "4")); final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); final String kafkaInputTopic = @@ -40,14 +51,10 @@ public class LoadGenerator { final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int instances = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - final int idStart = instanceId * WL_MAX_RECORDS; - final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors); - LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd); - final List<String> sensors = IntStream.range(idStart, idEnd) - .mapToObj(i -> "s_" + i) - .collect(Collectors.toList()); - + // create kafka record sender final Properties kafkaProperties = new Properties(); // kafkaProperties.put("acks", this.acknowledges); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); @@ -60,33 +67,21 @@ public class LoadGenerator { r -> r.getTimestamp(), kafkaProperties); - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); + // create workload generator + final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = + KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() + .setInstances(instances) + .setKeySpace(new KeySpace("s_", numSensors)) + .setThreads(threads) + .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .setGeneratorFunction( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) + .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .setKafkaRecordSender(kafkaRecordSender) + .build(); + // start + workloadGenerator.start(); } - - private static int getInstanceId() { - final String podName = System.getenv("POD_NAME"); - if (podName == null) { - return 0; - } else { - return Pattern.compile("-") - .splitAsStream(podName) - .reduce((p, x) -> x) - .map(Integer::parseInt) - .orElse(0); - } - } - } diff --git a/uc2-workload-generator/build.gradle b/uc2-workload-generator/build.gradle index f2c3e5d2e73b655dffd94222ecfbc4fc31b7f722..b92e0c2edc54786ea957338b9981922f0a6a7b32 100644 --- a/uc2-workload-generator/build.gradle +++ b/uc2-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" diff --git a/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java deleted file mode 100644 index bf562d86ac913138f48da79c4542d9583b1c8390..0000000000000000000000000000000000000000 --- a/uc2-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ /dev/null @@ -1,84 +0,0 @@ -package theodolite.kafkasender; - -import java.util.Properties; -import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - - -/** - * Sends monitoring records to Kafka. - * - * @param <T> {@link IMonitoringRecord} to send - */ -public class KafkaRecordSender<T extends IMonitoringRecord> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - - /** - * Create a new {@link KafkaRecordSender}. - */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); - } - - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); - } - - public void terminate() { - this.producer.close(); - } - -} diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 823f4f2761cc3c409451c67b7302e3d2f17adbb9..6cf7d81af21545b288c9bc24177575e6966b95de 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,18 +1,18 @@ package theodolite.uc2.workloadgenerator; import java.io.IOException; -import java.util.List; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.kafkasender.KafkaRecordSender; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.configuration.events.Event; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; @@ -22,13 +22,20 @@ public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final long MAX_DURATION_IN_DAYS = 30L; + public static void main(final String[] args) throws InterruptedException, IOException { + // uc2 LOGGER.info("Start workload generator for use case UC2."); + // get environment variables final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); final int numNestedGroups = Integer .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = + final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); + final int zooKeeperPort = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); @@ -44,38 +51,14 @@ public class LoadGenerator { final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int instances = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } + // build sensor registry + final MutableSensorRegistry sensorRegistry = + buildSensorRegistry(hierarchy, numNestedGroups, numSensors); + // create kafka record sender final Properties kafkaProperties = new Properties(); // kafkaProperties.put("acks", this.acknowledges); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); @@ -85,20 +68,61 @@ public class LoadGenerator { new KafkaRecordSender<>(kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); + // create workload generator + final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = + KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() + .setInstances(instances) + .setKeySpace(new KeySpace("s_", numSensors)) + .setThreads(threads) + .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .setBeforeAction(() -> { + if (sendRegistry) { + final ConfigPublisher configPublisher = + new ConfigPublisher(kafkaBootstrapServers, "configuration"); + configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); + configPublisher.close(); + LOGGER.info("Configuration sent."); + + LOGGER.info("Now wait 30 seconds"); + try { + Thread.sleep(30_000); + } catch (final InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + LOGGER.info("And woke up again :)"); + } + }) + .setGeneratorFunction( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) + .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .setKafkaRecordSender(kafkaRecordSender) + .build(); + + // start + workloadGenerator.start(); + } - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); + private static MutableSensorRegistry buildSensorRegistry( + final String hierarchy, + final int numNestedGroups, + final int numSensors) { + final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); + if (hierarchy.equals("deep")) { + MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); + for (int lvl = 1; lvl < numNestedGroups; lvl++) { + lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); + } + for (int s = 0; s < numSensors; s++) { + lastSensor.addChildMachineSensor("sensor_" + s); + } + } else if (hierarchy.equals("full")) { + addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0); + } else { + throw new IllegalStateException(); } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - + return sensorRegistry; } private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java deleted file mode 100644 index 1e58541758602cd2b1ea84f3ac3360aa3911425d..0000000000000000000000000000000000000000 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java +++ /dev/null @@ -1,165 +0,0 @@ -package theodolite.uc2.workloadgenerator; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.kafka.clients.producer.ProducerConfig; -import theodolite.kafkasender.KafkaRecordSender; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGeneratorExtrem { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final boolean doNothing = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final int producers = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final SensorRegistry sensorRegistry = - buildSensorRegistry(hierarchy, numNestedGroups, numSensor); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream - .<KafkaRecordSender<ActivePowerRecord>>generate( - () -> new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties)) - .limit(producers) - .collect(Collectors.toList()); - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - for (int i = 0; i < threads; i++) { - final int threadId = i; - new Thread(() -> { - while (true) { - for (final String sensor : sensors) { - if (!doNothing) { - kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - } - } - } - }).start(); - } - - while (true) { - printCpuUsagePerThread(); - } - - // System.out.println("Wait for termination..."); - // Thread.sleep(30 * 24 * 60 * 60 * 1000L); - // System.out.println("Will terminate now"); - } - - private static void printCpuUsagePerThread() throws InterruptedException { - final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); - final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); - - final long start = System.nanoTime(); - final long[] startCpuTimes = new long[threads.size()]; - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); - } - - Thread.sleep(5000); - - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; - final long dur = System.nanoTime() - start; - final double util = (double) cpuTime / dur; - System.out.println( - "Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); - } - } - - private static SensorRegistry buildSensorRegistry(final String hierarchy, - final int numNestedGroups, final int numSensor) { - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - return sensorRegistry; - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc3-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc3-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java deleted file mode 100644 index bf562d86ac913138f48da79c4542d9583b1c8390..0000000000000000000000000000000000000000 --- a/uc3-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ /dev/null @@ -1,84 +0,0 @@ -package theodolite.kafkasender; - -import java.util.Properties; -import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - - -/** - * Sends monitoring records to Kafka. - * - * @param <T> {@link IMonitoringRecord} to send - */ -public class KafkaRecordSender<T extends IMonitoringRecord> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - - /** - * Create a new {@link KafkaRecordSender}. - */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); - } - - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); - } - - public void terminate() { - this.producer.close(); - } - -} diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index a063ea359571d67fe118ec2f0951664e62624d98..80e3810f9d9a8e872d44f794e7a3f29ce8a3b2e0 100644 --- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -1,34 +1,36 @@ package theodolite.uc3.workloadgenerator; import java.io.IOException; -import java.util.List; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.kafkasender.KafkaRecordSender; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); - private static final int WL_MAX_RECORDS = 150_000; + private static final long MAX_DURATION_IN_DAYS = 30L; public static void main(final String[] args) throws InterruptedException, IOException { + // uc2 LOGGER.info("Start workload generator for use case UC3."); + // get environment variables + final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); + final int zooKeeperPort = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); - final int instanceId = getInstanceId(); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); @@ -41,14 +43,10 @@ public class LoadGenerator { final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int instances = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); - final int idStart = instanceId * WL_MAX_RECORDS; - final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors); - LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd); - final List<String> sensors = IntStream.range(idStart, idEnd) - .mapToObj(i -> "s_" + i) - .collect(Collectors.toList()); - + // create kafka record sender final Properties kafkaProperties = new Properties(); // kafkaProperties.put("acks", this.acknowledges); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); @@ -58,35 +56,22 @@ public class LoadGenerator { new KafkaRecordSender<>(kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - LOGGER.info("Start setting up sensors."); - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); - } - LOGGER.info("Finished setting up sensors."); + // create workload generator + final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = + KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() + .setInstances(instances) + .setKeySpace(new KeySpace("s_", numSensors)) + .setThreads(threads) + .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .setGeneratorFunction( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) + .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .setKafkaRecordSender(kafkaRecordSender) + .build(); - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); + // start + workloadGenerator.start(); } - - private static int getInstanceId() { - final String podName = System.getenv("POD_NAME"); - if (podName == null) { - return 0; - } else { - return Pattern.compile("-") - .splitAsStream(podName) - .reduce((p, x) -> x) - .map(Integer::parseInt) - .orElse(0); - } - } - } diff --git a/uc4-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/uc4-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java deleted file mode 100644 index bf562d86ac913138f48da79c4542d9583b1c8390..0000000000000000000000000000000000000000 --- a/uc4-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ /dev/null @@ -1,84 +0,0 @@ -package theodolite.kafkasender; - -import java.util.Properties; -import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - - -/** - * Sends monitoring records to Kafka. - * - * @param <T> {@link IMonitoringRecord} to send - */ -public class KafkaRecordSender<T extends IMonitoringRecord> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - - /** - * Create a new {@link KafkaRecordSender}. - */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); - } - - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); - } - - public void terminate() { - this.producer.close(); - } - -} diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 90d28aafb86b2b5da050d0110d425b5ec1ffe5e6..84df87a6a7a55b3b001db8037ca156d9b28fd39c 100644 --- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,30 +1,35 @@ package theodolite.uc4.workloadgenerator; import java.io.IOException; -import java.util.List; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.kafkasender.KafkaRecordSender; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + private static final long MAX_DURATION_IN_DAYS = 30L; + public static void main(final String[] args) throws InterruptedException, IOException { // uc4 LOGGER.info("Start workload generator for use case UC4."); - final int numSensor = + // get environment variables + final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); + final int zooKeeperPort = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181")); + final int numSensors = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); @@ -38,7 +43,10 @@ public class LoadGenerator { final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int instances = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); + // create kafka record sender final Properties kafkaProperties = new Properties(); // kafkaProperties.put("acks", this.acknowledges); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); @@ -48,23 +56,22 @@ public class LoadGenerator { new KafkaRecordSender<>(kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties); - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - final List<String> sensors = - IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList()); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> { - kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); - }, initialDelay, periodMs, TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); + // create workload generator + final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = + KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() + .setInstances(instances) + .setKeySpace(new KeySpace("s_", numSensors)) + .setThreads(threads) + .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) + .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .setGeneratorFunction( + sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) + .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .setKafkaRecordSender(kafkaRecordSender) + .build(); + // start + workloadGenerator.start(); } } diff --git a/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs b/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..4e04e2891754324a6e1bf55348b6a38f592bb301 --- /dev/null +++ b/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=15 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/workload-generator-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/workload-generator-commons/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/workload-generator-commons/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/workload-generator-commons/.settings/qa.eclipse.plugin.pmd.prefs b/workload-generator-commons/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/workload-generator-commons/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/workload-generator-commons/build.gradle b/workload-generator-commons/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..eef987cd444c3b6c3d8a532c8d192e94311176db --- /dev/null +++ b/workload-generator-commons/build.gradle @@ -0,0 +1,3 @@ +dependencies { + implementation 'org.apache.curator:curator-recipes:4.3.0' +} \ No newline at end of file diff --git a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java similarity index 90% rename from uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java rename to workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java index bf562d86ac913138f48da79c4542d9583b1c8390..c420658b7131ca5cc5d24e7d1b5d5a8069414cca 100644 --- a/uc1-workload-generator/src/main/java/theodolite/kafkasender/KafkaRecordSender.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/kafka/KafkaRecordSender.java @@ -1,4 +1,4 @@ -package theodolite.kafkasender; +package theodolite.commons.workloadgeneration.communication.kafka; import java.util.Properties; import java.util.function.Function; @@ -9,15 +9,15 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.functions.Transport; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - /** * Sends monitoring records to Kafka. * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> { +public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -81,4 +81,9 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { this.producer.close(); } + @Override + public void transport(final T message) { + this.write(message); + } + } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java new file mode 100644 index 0000000000000000000000000000000000000000..6ad61ae9ced4eda35b6828677efc267cb56aaf19 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java @@ -0,0 +1,200 @@ +package theodolite.commons.workloadgeneration.communication.zookeeper; + +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.functions.BeforeAction; +import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; + +/** + * The central class responsible for distributing the workload through all workload generators. + */ +public class WorkloadDistributor { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class); + + private static final String NAMESPACE = "workload-generation"; + private static final String COUNTER_PATH = "/counter"; + private static final String WORKLOAD_PATH = "/workload"; + private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; + + // Curator retry strategy + private static final int BASE_SLEEP_TIME_MS = 2000; + private static final int MAX_RETRIES = 5; + + // Wait time + private static final int MAX_WAIT_TIME = 20_000; + + private final DistributedAtomicInteger counter; + private final KeySpace keySpace; + private final BeforeAction beforeAction; + private final BiConsumer<WorkloadDefinition, Integer> workerAction; + + private final int instances; + private final ZooKeeper zooKeeper; + private final CuratorFramework client; + + private boolean workloadGenerationStarted = false; + + /** + * Create a new workload distributor. + * + * @param keySpace the keyspace for the workload generation. + * @param beforeAction the before action for the workload generation. + * @param workerAction the action to perform by the workers. + */ + public WorkloadDistributor( + final int instances, + final ZooKeeper zooKeeper, + final KeySpace keySpace, + final BeforeAction beforeAction, + final BiConsumer<WorkloadDefinition, Integer> workerAction) { + this.instances = instances; + this.zooKeeper = zooKeeper; + this.keySpace = keySpace; + this.beforeAction = beforeAction; + this.workerAction = workerAction; + + this.client = CuratorFrameworkFactory.builder() + .namespace(NAMESPACE) + .connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort()) + .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)) + .build(); + + this.client.start(); + + try { + this.client.blockUntilConnected(); + } catch (final InterruptedException e) { + LOGGER.error("", e); + throw new IllegalStateException(); + } + + this.counter = + new DistributedAtomicInteger(this.client, COUNTER_PATH, + new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)); + } + + /** + * Start the workload distribution. + */ + public void start() { + try { + AtomicValue<Integer> result = this.counter.increment(); + while (!result.succeeded()) { + result = this.counter.increment(); + } + + final int workerId = result.preValue(); + + final CuratorWatcher watcher = this.buildWatcher(workerId); + + final Stat nodeExists = + this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH); + if (nodeExists == null) { + this.client.create().forPath(WORKLOAD_PATH); + } + + if (workerId == 0) { + LOGGER.info("This instance is master with id {}", workerId); + + this.beforeAction.run(); + + // register worker action, as master acts also as worker + this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + + LOGGER.info("Number of Workers: {}", this.instances); + + final WorkloadDefinition definition = + new WorkloadDefinition(this.keySpace, this.instances); + + this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH, + definition.toString().getBytes(StandardCharsets.UTF_8)); + + } else { + LOGGER.info("This instance is worker with id {}", workerId); + + this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + + final Stat definitionExists = + this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); + + if (definitionExists != null) { + this.startWorkloadGeneration(workerId); + } + } + + Thread.sleep(MAX_WAIT_TIME); + + if (!this.workloadGenerationStarted) { + LOGGER.warn("No workload definition retrieved for 20 s. Terminating now.."); + } + } catch (final Exception e) { + LOGGER.error("", e); + throw new IllegalStateException("Error when starting the distribution of the workload."); + } + } + + /** + * Start the workload generation. This methods body does only get executed once. + * + * @param workerId the ID of this worker + * @throws Exception when an error occurs + */ + private synchronized void startWorkloadGeneration(final int workerId) throws Exception { + if (!this.workloadGenerationStarted) { + this.workloadGenerationStarted = true; + + final byte[] bytes = + this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); + final WorkloadDefinition definition = + WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); + + this.workerAction.accept(definition, workerId); + } + } + + /** + * Build a curator watcher which performs the worker action. + * + * @param worker the worker to create the watcher for. + * @return the curator watcher. + */ + private CuratorWatcher buildWatcher(final int workerId) { + return new CuratorWatcher() { + + @Override + public void process(final WatchedEvent event) { + if (event.getType() == EventType.NodeChildrenChanged) { + try { + WorkloadDistributor.this.startWorkloadGeneration(workerId); + } catch (final Exception e) { + LOGGER.error("", e); + throw new IllegalStateException("Error starting workload generation."); + } + } + } + }; + } + + /** + * Stop the workload distributor. + */ + public void stop() { + this.client.close(); + } + +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java new file mode 100644 index 0000000000000000000000000000000000000000..2eaa1d487f67ae8325a3622a7ae6c4529fbb1cd6 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java @@ -0,0 +1,56 @@ +package theodolite.commons.workloadgeneration.dimensions; + +import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; + +/** + * Wrapper class for the definition of the Keys that should be used by the + * {@link AbstractWorkloadGenerator}. + */ +public class KeySpace { + + private final String prefix; + private final int min; + private final int max; + + + /** + * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of + * each key will be determined by a number of the interval ({@code min}, {@code max}-1). + * + * @param prefix the prefix to use for all keys + * @param min the lower bound (inclusive) to start counting from + * @param max the upper bound (exclusive) to count to + */ + public KeySpace(final String prefix, final int min, final int max) { + if (prefix == null || prefix.contains(";")) { + throw new IllegalArgumentException( + "The prefix must not be null and must not contain the ';' character."); + } + this.prefix = prefix; + this.min = min; + this.max = max; + + } + + public KeySpace(final String prefix, final int numberOfKeys) { + this(prefix, 0, numberOfKeys - 1); + } + + public KeySpace(final int numberOfKeys) { + this("sensor_", 0, numberOfKeys - 1); + } + + public String getPrefix() { + return this.prefix; + } + + + public int getMin() { + return this.min; + } + + + public int getMax() { + return this.max; + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java new file mode 100644 index 0000000000000000000000000000000000000000..7914a4985b6df40f7146c1fd681d1fba063f8b98 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/BeforeAction.java @@ -0,0 +1,11 @@ +package theodolite.commons.workloadgeneration.functions; + +/** + * Describes the before action which is executed before every sub experiment. + */ +@FunctionalInterface +public interface BeforeAction { + + public void run(); + +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..8c59079ddabafa4fb1de398b7d58503362fa721e --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/MessageGenerator.java @@ -0,0 +1,16 @@ +package theodolite.commons.workloadgeneration.functions; + +import kieker.common.record.IMonitoringRecord; + +/** + * This interface describes a function that takes meta information from a string (e.g. an ID) and + * produces an {@link IMonitoringRecord}. + * + * @param <T> the type of the objects that will be generated by the function. + */ +@FunctionalInterface +public interface MessageGenerator<T extends IMonitoringRecord> { + + T generateMessage(final String key); + +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java new file mode 100644 index 0000000000000000000000000000000000000000..7c95e24f2b97d6259ec8c1bb4c75a356ef477287 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/functions/Transport.java @@ -0,0 +1,16 @@ +package theodolite.commons.workloadgeneration.functions; + +import kieker.common.record.IMonitoringRecord; + +/** + * This interface describes a function that consumes a {@link IMonitoringRecord}. This function is + * dedicated to be used to transport individual messages to the messaging system. + * + * @param <T> the type of records to send as messages. + */ +@FunctionalInterface +public interface Transport<T extends IMonitoringRecord> { + + void transport(final T message); + +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..889075bf81df22847f93bfcfaeb00d762fe62dad --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -0,0 +1,157 @@ +package theodolite.commons.workloadgeneration.generators; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import kieker.common.record.IMonitoringRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.functions.BeforeAction; +import theodolite.commons.workloadgeneration.functions.MessageGenerator; +import theodolite.commons.workloadgeneration.functions.Transport; +import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; +import theodolite.commons.workloadgeneration.misc.WorkloadEntity; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; + +/** + * Base for workload generators. + * + * @param <T> The type of records the workload generator is dedicated for. + */ +public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> + implements WorkloadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); + + private final int instances; + + private final ZooKeeper zooKeeper; + + private final KeySpace keySpace; + + private final int threads; + + private final Duration period; + + private final Duration duration; + + private final BeforeAction beforeAction; + + private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; + + private final MessageGenerator<T> generatorFunction; + + private final Transport<T> transport; + + private WorkloadDistributor workloadDistributor; + + private final ScheduledExecutorService executor; + + /** + * Create a new workload generator. + * + * @param instances the number of workload-generator instances. + * @param zooKeeper the zookeeper connection. + * @param keySpace the keyspace. + * @param threads the number of threads that is used to generate the load. + * @param period the period, how often a new record is emitted. + * @param duration the maximum runtime. + * @param beforeAction the action to perform before the workload generation starts. + * @param generatorFunction the function that is used to generate the individual records. + * @param transport the function that is used to send generated messages to the messaging system. + */ + public AbstractWorkloadGenerator( + final int instances, + final ZooKeeper zooKeeper, + final KeySpace keySpace, + final int threads, + final Duration period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final Transport<T> transport) { + this.instances = instances; + this.zooKeeper = zooKeeper; + this.period = period; + this.threads = threads; + this.keySpace = keySpace; + this.duration = duration; + this.beforeAction = beforeAction; + this.generatorFunction = generatorFunction; + this.workloadSelector = (workloadDefinition, workerId) -> { + final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); + + for (int i = + workloadDefinition.getKeySpace().getMin() + workerId; i <= workloadDefinition + .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) { + final String id = workloadDefinition.getKeySpace().getPrefix() + i; + workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); + } + + return workloadEntities; + }; + this.transport = transport; + + this.executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + final int periodMs = (int) period.toMillis(); + + LOGGER.info("Period: " + periodMs); + + final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> { + + final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, workerId); + + LOGGER.info("Beginning of Experiment..."); + LOGGER.info("Generating records for {} keys.", entities.size()); + LOGGER.info("Experiment is going to be executed for the specified duration..."); + + entities.forEach(entity -> { + final T message = entity.generateMessage(); + final long initialDelay = random.nextInt(periodMs); + final Runnable task = () -> { + this.transport.transport(message); + }; + this.executor.scheduleAtFixedRate(task, initialDelay, + periodMs, TimeUnit.MILLISECONDS); + }); + + + try { + this.executor.awaitTermination(duration.getSeconds(), TimeUnit.SECONDS); + LOGGER.info("Terminating now..."); + this.stop(); + } catch (final InterruptedException e) { + LOGGER.error("", e); + throw new IllegalStateException("Error when terminating the workload generation."); + } + }; + + this.workloadDistributor = + new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction, + workerAction); + } + + /** + * Start the workload generation. The generation terminates automatically after the specified + * {@code duration}. + */ + @Override + public void start() { + this.workloadDistributor.start(); + } + + @Override + public void stop() { + this.workloadDistributor.stop(); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..9cce7e56390a2f5076e4030b25b1697db2179dae --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -0,0 +1,59 @@ +package theodolite.commons.workloadgeneration.generators; + +import java.time.Duration; +import kieker.common.record.IMonitoringRecord; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.functions.BeforeAction; +import theodolite.commons.workloadgeneration.functions.MessageGenerator; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; + +/** + * Workload generator for generating load for the kafka messaging system. + * + * @param <T> The type of records the workload generator is dedicated for. + */ +public class KafkaWorkloadGenerator<T extends IMonitoringRecord> + extends AbstractWorkloadGenerator<T> { + + private final KafkaRecordSender<T> recordSender; + + /** + * Create a new workload generator. + * + * @param zooKeeper a reference to the ZooKeeper instance. + * @param keySpace the key space to generate the workload for. + * @param threads tha amount of threads to use per instance. + * @param period the period how often a message is generated for each key specified in the + * {@code keySpace} + * @param duration the duration how long the workload generator will emit messages. + * @param beforeAction the action which will be performed before the workload generator starts + * generating messages. If {@code null}, no before action will be performed. + * @param generatorFunction the generator function. This function is executed, each time a message + * is generated. + * @param recordSender the record sender which is used to send the generated messages to kafka. + */ + public KafkaWorkloadGenerator( + final int instances, + final ZooKeeper zooKeeper, + final KeySpace keySpace, + final int threads, + final Duration period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final KafkaRecordSender<T> recordSender) { + super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction, + generatorFunction, + recordSender); + this.recordSender = recordSender; + } + + + @Override + public void stop() { + this.recordSender.terminate(); + + super.stop(); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..4ece341bcb1f8e7c3394f8d30e19dae9e166e01f --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -0,0 +1,188 @@ +package theodolite.commons.workloadgeneration.generators; + +import java.time.Duration; +import java.util.Objects; +import kieker.common.record.IMonitoringRecord; +import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.functions.BeforeAction; +import theodolite.commons.workloadgeneration.functions.MessageGenerator; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; + +/** + * Builder for {@link workload generators}. + * + * @param <T> the record for which the builder is dedicated for. + */ +public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { + + private int instances; + + private ZooKeeper zooKeeper; + + private KeySpace keySpace; + + private int threads; + + private Duration period; + + private Duration duration; + + private BeforeAction beforeAction; + + private MessageGenerator<T> generatorFunction; + + private KafkaRecordSender<T> kafkaRecordSender; + + private KafkaWorkloadGeneratorBuilder() { + + } + + /** + * Get a builder for the {@link KafkaWorkloadGenerator}. + * + * @return the builder. + */ + public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() { + return new KafkaWorkloadGeneratorBuilder<>(); + } + + /** + * Set the number of instances. + * + * @param instances the number of instances. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) { + this.instances = instances; + return this; + } + + /** + * Set the ZooKeeper reference. + * + * @param zooKeeper a reference to the ZooKeeper instance. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) { + this.zooKeeper = zooKeeper; + return this; + } + + /** + * Set the before action for the {@link KafkaWorkloadGenerator}. + * + * @param beforeAction the {@link BeforeAction}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) { + this.beforeAction = beforeAction; + return this; + } + + /** + * Set the key space for the {@link KafkaWorkloadGenerator}. + * + * @param keySpace the {@link KeySpace}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) { + this.keySpace = keySpace; + return this; + } + + /** + * Set the key space for the {@link KafkaWorkloadGenerator}. + * + * @param threads the number of threads. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) { + this.threads = threads; + return this; + } + + /** + * Set the period for the {@link KafkaWorkloadGenerator}. + * + * @param period the {@link Period} + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Duration period) { + this.period = period; + return this; + } + + /** + * Set the durtion for the {@link KafkaWorkloadGenerator}. + * + * @param duration the {@link Duration}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) { + this.duration = duration; + return this; + } + + /** + * Set the generator function for the {@link KafkaWorkloadGenerator}. + * + * @param generatorFunction the generator function. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction( + final MessageGenerator<T> generatorFunction) { + this.generatorFunction = generatorFunction; + return this; + } + + /** + * Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}. + * + * @param kafkaRecordSender the record sender to use. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender( + final KafkaRecordSender<T> kafkaRecordSender) { + this.kafkaRecordSender = kafkaRecordSender; + return this; + } + + /** + * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be + * specicified before this method is called: + * <ul> + * <li>zookeeper</li> + * <li>key space</li> + * <li>period</li> + * <li>duration</li> + * <li>generator function</li> + * <li>kafka record sender</li> + * </ul> + * + * @return the built instance of the {@link KafkaWorkloadGenerator}. + */ + public KafkaWorkloadGenerator<T> build() { + Objects.requireNonNull(this.instances, "Please specify the number of instances."); + Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance."); + this.threads = Objects.requireNonNullElse(this.threads, 1); + Objects.requireNonNull(this.keySpace, "Please specify the key space."); + Objects.requireNonNull(this.period, "Please specify the period."); + Objects.requireNonNull(this.duration, "Please specify the duration."); + this.beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> { + }); + Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); + Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); + + return new KafkaWorkloadGenerator<>( + this.instances, + this.zooKeeper, + this.keySpace, + this.threads, + this.period, + this.duration, + this.beforeAction, + this.generatorFunction, + this.kafkaRecordSender); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..b121ac157b84d64818d9fdfc90589d49fd933752 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java @@ -0,0 +1,18 @@ +package theodolite.commons.workloadgeneration.generators; + +/** + * Base methods for workload generators. + */ +public interface WorkloadGenerator { + + /** + * Start the workload generation. + */ + void start(); + + /** + * Stop the workload generation. + */ + void stop(); + +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java new file mode 100644 index 0000000000000000000000000000000000000000..86369d6c883954b792b2ee0fd6a988377ecb8965 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadDefinition.java @@ -0,0 +1,71 @@ +package theodolite.commons.workloadgeneration.misc; + +import theodolite.commons.workloadgeneration.dimensions.KeySpace; + +/** + * The central class that contains all information that needs to be exchanged between the nodes for + * distributed workload generation. + */ +public class WorkloadDefinition { + private static final int ZERO = 0; + private static final int ONE = 1; + private static final int TWO = 2; + private static final int THREE = 3; + private static final int FOUR = 4; + + private final KeySpace keySpace; + private final int numberOfWorkers; + + /** + * Create a new workload definition. + * + * @param keySpace the key space to use. + * @param numberOfWorkers the number of workers participating in the workload generation. + */ + public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) { + + this.keySpace = keySpace; + this.numberOfWorkers = numberOfWorkers; + } + + public KeySpace getKeySpace() { + return this.keySpace; + } + + public int getNumberOfWorkers() { + return this.numberOfWorkers; + } + + /** + * Simple method for encoding all information of the workload definition into one string. + * + * @return a string that encodes all information of the workload generation in a compact format. + * The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'. + */ + @Override + public String toString() { + return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" + + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); + } + + /** + * Parse a workload generation from a previously encoded string with the format returned by + * {@link WorkloadDefinition#toString()}. + * + * @param workloadDefinitionString the workload definition string. + * @return the parsed workload definition. + */ + public static WorkloadDefinition fromString(final String workloadDefinitionString) { + final String[] deserialized = workloadDefinitionString.split(";"); + + if (deserialized.length != FOUR) { + throw new IllegalArgumentException( + "Wrong workload definition string when trying to parse the workload generation."); + } + + return new WorkloadDefinition( + new KeySpace(deserialized[ZERO], Integer.valueOf(deserialized[ONE]), + Integer.valueOf(deserialized[TWO])), + Integer.valueOf(deserialized[THREE])); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java new file mode 100644 index 0000000000000000000000000000000000000000..1e3c55257f7454a836e774f76019a261868c5a0a --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/WorkloadEntity.java @@ -0,0 +1,23 @@ +package theodolite.commons.workloadgeneration.misc; + +import kieker.common.record.IMonitoringRecord; +import theodolite.commons.workloadgeneration.functions.MessageGenerator; + +/** + * Representation of a entity of the workload generation that generates load for one fixed key. + * + * @param <T> The type of records the workload generator is dedicated for. + */ +public class WorkloadEntity<T extends IMonitoringRecord> { + private final String key; + private final MessageGenerator<T> generator; + + public WorkloadEntity(final String key, final MessageGenerator<T> generator) { + this.key = key; + this.generator = generator; + } + + public T generateMessage() { + return this.generator.generateMessage(this.key); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java new file mode 100644 index 0000000000000000000000000000000000000000..a80490600ad9c9c22c198fc76b6d9f73bdc30584 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/misc/ZooKeeper.java @@ -0,0 +1,29 @@ +package theodolite.commons.workloadgeneration.misc; + +/** + * Wrapper for connection information for ZooKeeper. + */ +public class ZooKeeper { + + private final String host; + private final int port; + + /** + * Create a new representation of an ZooKeeper instance. + * + * @param host of zookeeper. + * @param port of zookeeper. + */ + public ZooKeeper(final String host, final int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return this.host; + } + + public int getPort() { + return this.port; + } +}