diff --git a/execution/README.md b/execution/README.md index 315e20d86831f4c6779e59e258ea74b80ae6c530..2dcc242c7aa2b987f84d3d90a162ebc6a34df127 100644 --- a/execution/README.md +++ b/execution/README.md @@ -120,7 +120,7 @@ can be installed via Helm. We also provide a [default configuration](infrastruct To install it: ```sh -helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.0/kafka-lag-exporter-0.6.0.tgz -f infrastructure/kafka-lag-exporter/values.yaml +helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.3/kafka-lag-exporter-0.6.3.tgz -f infrastructure/kafka-lag-exporter/values.yaml ``` @@ -159,8 +159,8 @@ The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to ``` * `<use-case>`: Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`. -* `<wl-values>`: Values for the workload generator to be tested, separated by commas and sorted in ascending order. For example `100000, 200000, 300000`. -* `<instances>`: Numbers of instances to be benchmarked, separated by commas and sorted in ascending order. For example `1, 2, 3, 4`. +* `<wl-values>`: Values for the workload generator to be tested, separated by commas, quoted, and sorted in ascending order. For example `"100000, 200000, 300000"`. +* `<instances>`: Numbers of instances to be benchmarked, separated by commas, quoted, and sorted in ascending order. For example `"1, 2, 3, 4"`. * `<partitions>`: Number of partitions for Kafka topics. Optional. Default `40`. * `<cpu-limit>`: Kubernetes CPU limit. Optional. Default `1000m`. * `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`. diff --git a/execution/run_uc1.sh b/execution/run_uc1.sh index 04eb86edc9bb5653f3281793bf48655bca643391..e6a3eb05ed7cca167ccbc9ae8c3d5cbc9803e000 100755 --- a/execution/run_uc1.sh +++ b/execution/run_uc1.sh @@ -76,7 +76,7 @@ echo "Finished execution, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 do - kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*'" + kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists" echo "Wait for topic deletion" sleep 5s #echo "Finished waiting, print topics:" @@ -90,25 +90,10 @@ echo "Finish topic deletion, print topics:" echo "Delete ZooKeeper configurations used for workload generation" kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" echo "Waiting for deletion" - -while [ true ] +while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation" do - IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') - found=0 - for element in "${array[@]}" - do - if [ "$element" == "workload-generation" ]; then - found=1 - break - fi - done - if [ $found -ne 1 ]; then - echo "ZooKeeper reset was successful." - break - else - echo "ZooKeeper reset was not successful. Retrying in 5s." - sleep 5s - fi + echo "Wait for ZooKeeper state deletion." + sleep 5s done echo "Deletion finished" diff --git a/execution/run_uc2.sh b/execution/run_uc2.sh index a2a43a806ab8cc796f45d9c88f4cbf87049b0c3f..76d76cd4dc45b3b5e26ea4033c7afd58268fd3fb 100755 --- a/execution/run_uc2.sh +++ b/execution/run_uc2.sh @@ -75,7 +75,7 @@ echo "Finished execution, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|aggregation-feedback|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 do - kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|aggregation-feedback|output|configuration|theodolite-.*'" + kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|aggregation-feedback|output|configuration|theodolite-.*' --if-exists" echo "Wait for topic deletion" sleep 5s #echo "Finished waiting, print topics:" @@ -89,25 +89,10 @@ echo "Finish topic deletion, print topics:" echo "Delete ZooKeeper configurations used for workload generation" kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" echo "Waiting for deletion" - -while [ true ] +while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation" do - IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') - found=0 - for element in "${array[@]}" - do - if [ "$element" == "workload-generation" ]; then - found=1 - break - fi - done - if [ $found -ne 1 ]; then - echo "ZooKeeper reset was successful." - break - else - echo "ZooKeeper reset was not successful. Retrying in 5s." - sleep 5s - fi + echo "Wait for ZooKeeper state deletion." + sleep 5s done echo "Deletion finished" diff --git a/execution/run_uc3.sh b/execution/run_uc3.sh index f214e20b3af93b0f89d76d6ea50ce3d7cd428ded..1e34aea99fdc7a927e1943a397f02e1bb56f6a74 100755 --- a/execution/run_uc3.sh +++ b/execution/run_uc3.sh @@ -77,7 +77,7 @@ echo "Finished execution, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 do - kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*'" + kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists" echo "Wait for topic deletion" sleep 5s #echo "Finished waiting, print topics:" @@ -91,25 +91,10 @@ echo "Finish topic deletion, print topics:" echo "Delete ZooKeeper configurations used for workload generation" kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" echo "Waiting for deletion" - -while [ true ] +while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation" do - IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') - found=0 - for element in "${array[@]}" - do - if [ "$element" == "workload-generation" ]; then - found=1 - break - fi - done - if [ $found -ne 1 ]; then - echo "ZooKeeper reset was successful." - break - else - echo "ZooKeeper reset was not successful. Retrying in 5s." - sleep 5s - fi + echo "Wait for ZooKeeper state deletion." + sleep 5s done echo "Deletion finished" diff --git a/execution/run_uc4.sh b/execution/run_uc4.sh index 04fd130694e96285ca93b7561f1ea58ccdb30ab8..bfd3ed8e2b970b12c5835ba5bcd8ea2dace0d84b 100755 --- a/execution/run_uc4.sh +++ b/execution/run_uc4.sh @@ -76,7 +76,7 @@ echo "Finished execution, print topics:" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 do - kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*'" + kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists" echo "Wait for topic deletion" sleep 5s #echo "Finished waiting, print topics:" @@ -90,25 +90,10 @@ echo "Finish topic deletion, print topics:" echo "Delete ZooKeeper configurations used for workload generation" kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation" echo "Waiting for deletion" - -while [ true ] +while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation" do - IFS=', ' read -r -a array <<< $(kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 ls /" | tail -n 1 | awk -F[\]\[] '{print $2}') - found=0 - for element in "${array[@]}" - do - if [ "$element" == "workload-generation" ]; then - found=1 - break - fi - done - if [ $found -ne 1 ]; then - echo "ZooKeeper reset was successful." - break - else - echo "ZooKeeper reset was not successful. Retrying in 5s." - sleep 5s - fi + echo "Wait for ZooKeeper state deletion." + sleep 5s done echo "Deletion finished" diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 7f372e014371e5407374493b6aced3bf949a1674..104f1cefb34200a2cf34d1578faecdfdae6ccd56 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -98,9 +98,8 @@ public abstract class AbstractWorkloadGenerator<T> LOGGER.info("Experiment is going to be executed for the specified duration..."); entities.forEach(entity -> { - final T message = entity.generateMessage(); final long initialDelay = random.nextInt(periodMs); - final Runnable task = () -> this.transport.transport(message); + final Runnable task = () -> this.transport.transport(entity.generateMessage()); this.executor.scheduleAtFixedRate(task, initialDelay, periodMs, TimeUnit.MILLISECONDS); });