diff --git a/README.md b/README.md index 6ad1dd576bc165630fb378234102f324f9b66d8a..a9de5f63d46019d8c4a0c0c0a880658e0f321a48 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Theodolite is a framework for benchmarking the horizontal and vertical scalabili ## Theodolite Benchmarks -Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Kafka Streams. +Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Apache Kafka Streams. Benchmark implementation for Apache Flink are currently under development and can be found in the *apache-flink* branch of this repository. ## Theodolite Execution Framework diff --git a/execution/README.md b/execution/README.md index 7a71cf0d73d6dd737c181dc138a1dd4a7fb0dc30..68ab9b244ecf8fe0e75580bc4ce21d9efc3b0639 100644 --- a/execution/README.md +++ b/execution/README.md @@ -166,14 +166,14 @@ The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to * `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`. * `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`. * `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`. -* `<domain-restriction>`: The domain restriction: `domain-restriction` to use domain restriction `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_. +* `<domain-restriction>`: The domain restriction: `restrict-domain` to use domain restriction, `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_. * `<search-strategy>`: The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. Default `check-all`. For more details see Section _Benchmarking Search Strategies_. ### Domain Restriction For dimension value, we have a domain of the amounts of instances. As a consequence, for each dimension value the maximum number of lag experiments is equal to the size of the domain. How the domain is determined is defined by the following domain restriction strategies. * `no-domain-restriction`: For each dimension value, the domain of instances is equal to the set of all amounts of instances. -* `domain-restriction`: For each dimension value, the domain is computed as follows: +* `restrict-domain`: For each dimension value, the domain is computed as follows: * If the dimension value is the smallest dimension value the domain of the amounts of instances is equal to the set of all amounts of instances. * If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N. diff --git a/execution/run_uc.py b/execution/run_uc.py index 7c294007b155b3fe40c2fd4082a1e8e7ba31856b..3a21e23e6971fd0df6a19dd2a9d32cbf83e9af9b 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -421,25 +421,18 @@ def stop_lag_exporter(): """ print('Stop the lag exporter') - find_pod_command = [ - 'kubectl', - 'get', - 'pod', - '-l', - 'app.kubernetes.io/name=kafka-lag-exporter', - '-o', - 'jsonpath="{.items[0].metadata.name}"' - ] - output = subprocess.run(find_pod_command, capture_output=True, text=True) - lag_exporter_pod = output.stdout.replace('"', '') - delete_pod_command = [ - 'kubectl', - 'delete', - 'pod', - lag_exporter_pod - ] - output = subprocess.run(delete_pod_command, capture_output=True, text=True) - print(output) + try: + # Get lag exporter + pod_list = coreApi.list_namespaced_pod(namespace='default', label_selector='app.kubernetes.io/name=kafka-lag-exporter') + lag_exporter_pod = pod_list.items[0].metadata.name + + # Delete lag exporter pod + res = coreApi.delete_namespaced_pod(name=lag_exporter_pod, namespace='default') + except ApiException as e: + logging.error('Exception while stopping lag exporter') + logging.error(e) + + print('Deleted lag exporter pod: ' + lag_exporter_pod) return diff --git a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py index ed727ad607c99def55f429e97a0e66cf0eb72397..d93d4924cf09015c714604f2fc995e1db971e69d 100644 --- a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py +++ b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py @@ -88,6 +88,7 @@ def test_binary_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py index 33c32944b82d095e1b247a7bb7e84fe702c3f147..c15daca6ebab3171f0995c048afe56c0185efe56 100644 --- a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py +++ b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py @@ -103,6 +103,7 @@ def test_linear_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py index 9188b471949b2d0a505337a0b401df3f30da2763..86e2cd29d187cb83166102c503ee79e5e1424573 100644 --- a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py +++ b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py @@ -84,6 +84,7 @@ def test_linear_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py index 52ad2e0e7fc88038a2aa276ac967ccdd482d2e85..4f5da89cc72edd792015763539c9af4677772a79 100644 --- a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py +++ b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py @@ -93,6 +93,7 @@ def test_binary_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/strategies/tests/test_no_restriction_check_all_strategy.py b/execution/strategies/tests/test_no_restriction_check_all_strategy.py index 6f0a0eed500bd925166846ccdbfcda0a7d1c1095..f173a3d168704cc7a499933984b6510ebda2751e 100644 --- a/execution/strategies/tests/test_no_restriction_check_all_strategy.py +++ b/execution/strategies/tests/test_no_restriction_check_all_strategy.py @@ -120,6 +120,7 @@ def test_linear_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py index a7fd68b9b5b0c99ea6cd443c889d925d357b22cf..0e47c2e95b75ae682e82a02ad3d0a91c5a62f253 100644 --- a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py +++ b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py @@ -101,6 +101,7 @@ def test_linear_search_strategy(): # execute experiment_config = ExperimentConfig( + exp_id="0", use_case=uc, dim_values=dim_values, replicass=replicass, diff --git a/execution/theodolite.py b/execution/theodolite.py index ae7a9244f0880421c7572fbeda8caab49822a2b2..b76e0b2da01aef318fe8c5a5a1265276051f301d 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -40,6 +40,23 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ else: exp_id = 0 + # Store metadata + separator = "," + lines = [ + f"UC={uc}\n", + f"DIM_VALUES={separator.join(loads)}\n", + f"REPLICAS={separator.join(instances_list)}\n", + f"PARTITIONS={partitions}\n", + f"CPU_LIMIT={cpu_limit}\n", + f"MEMORY_LIMIT={memory_limit}\n", + f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n", + f"EXECUTION_MINUTES={duration}\n", + f"DOMAIN_RESTRICTION={domain_restriction}\n", + f"SEARCH_STRATEGY={search_strategy}" + ] + with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream: + stream.writelines(lines) + with open("exp_counter.txt", mode="w") as write_stream: write_stream.write(str(exp_id + 1)) diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties index 4d4bc7b5a31d811e856f04561c51fc7ac5a970a8..e577c880a8ff8169699acb8598e323b8671e8d5e 100644 --- a/uc4-application/src/main/resources/META-INF/application.properties +++ b/uc4-application/src/main/resources/META-INF/application.properties @@ -6,6 +6,9 @@ kafka.input.topic=input kafka.output.topic=output aggregation.duration.days=30 aggregation.advance.days=1 + +schema.registry.url=http://localhost:8091 + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1