Skip to content
Snippets Groups Projects
Commit d94ee0a6 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Merge branch 'master' into feature/cliArgumentsTheodolite

parents e7447d21 929f3430
No related branches found
No related tags found
1 merge request!44Enhanced CLI arguments for theodolite
Showing with 41 additions and 22 deletions
...@@ -6,7 +6,7 @@ Theodolite is a framework for benchmarking the horizontal and vertical scalabili ...@@ -6,7 +6,7 @@ Theodolite is a framework for benchmarking the horizontal and vertical scalabili
## Theodolite Benchmarks ## Theodolite Benchmarks
Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Kafka Streams. Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Apache Kafka Streams. Benchmark implementation for Apache Flink are currently under development and can be found in the *apache-flink* branch of this repository.
## Theodolite Execution Framework ## Theodolite Execution Framework
......
...@@ -166,14 +166,14 @@ The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to ...@@ -166,14 +166,14 @@ The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to
* `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`. * `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`.
* `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`. * `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`.
* `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`. * `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`.
* `<domain-restriction>`: The domain restriction: `domain-restriction` to use domain restriction `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_. * `<domain-restriction>`: The domain restriction: `restrict-domain` to use domain restriction, `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_.
* `<search-strategy>`: The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. Default `check-all`. For more details see Section _Benchmarking Search Strategies_. * `<search-strategy>`: The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. Default `check-all`. For more details see Section _Benchmarking Search Strategies_.
### Domain Restriction ### Domain Restriction
For dimension value, we have a domain of the amounts of instances. As a consequence, for each dimension value the maximum number of lag experiments is equal to the size of the domain. How the domain is determined is defined by the following domain restriction strategies. For dimension value, we have a domain of the amounts of instances. As a consequence, for each dimension value the maximum number of lag experiments is equal to the size of the domain. How the domain is determined is defined by the following domain restriction strategies.
* `no-domain-restriction`: For each dimension value, the domain of instances is equal to the set of all amounts of instances. * `no-domain-restriction`: For each dimension value, the domain of instances is equal to the set of all amounts of instances.
* `domain-restriction`: For each dimension value, the domain is computed as follows: * `restrict-domain`: For each dimension value, the domain is computed as follows:
* If the dimension value is the smallest dimension value the domain of the amounts of instances is equal to the set of all amounts of instances. * If the dimension value is the smallest dimension value the domain of the amounts of instances is equal to the set of all amounts of instances.
* If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N. * If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N.
......
...@@ -421,25 +421,18 @@ def stop_lag_exporter(): ...@@ -421,25 +421,18 @@ def stop_lag_exporter():
""" """
print('Stop the lag exporter') print('Stop the lag exporter')
find_pod_command = [ try:
'kubectl', # Get lag exporter
'get', pod_list = coreApi.list_namespaced_pod(namespace='default', label_selector='app.kubernetes.io/name=kafka-lag-exporter')
'pod', lag_exporter_pod = pod_list.items[0].metadata.name
'-l',
'app.kubernetes.io/name=kafka-lag-exporter', # Delete lag exporter pod
'-o', res = coreApi.delete_namespaced_pod(name=lag_exporter_pod, namespace='default')
'jsonpath="{.items[0].metadata.name}"' except ApiException as e:
] logging.error('Exception while stopping lag exporter')
output = subprocess.run(find_pod_command, capture_output=True, text=True) logging.error(e)
lag_exporter_pod = output.stdout.replace('"', '')
delete_pod_command = [ print('Deleted lag exporter pod: ' + lag_exporter_pod)
'kubectl',
'delete',
'pod',
lag_exporter_pod
]
output = subprocess.run(delete_pod_command, capture_output=True, text=True)
print(output)
return return
......
...@@ -88,6 +88,7 @@ def test_binary_search_strategy(): ...@@ -88,6 +88,7 @@ def test_binary_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -103,6 +103,7 @@ def test_linear_search_strategy(): ...@@ -103,6 +103,7 @@ def test_linear_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -84,6 +84,7 @@ def test_linear_search_strategy(): ...@@ -84,6 +84,7 @@ def test_linear_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -93,6 +93,7 @@ def test_binary_search_strategy(): ...@@ -93,6 +93,7 @@ def test_binary_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -120,6 +120,7 @@ def test_linear_search_strategy(): ...@@ -120,6 +120,7 @@ def test_linear_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -101,6 +101,7 @@ def test_linear_search_strategy(): ...@@ -101,6 +101,7 @@ def test_linear_search_strategy():
# execute # execute
experiment_config = ExperimentConfig( experiment_config = ExperimentConfig(
exp_id="0",
use_case=uc, use_case=uc,
dim_values=dim_values, dim_values=dim_values,
replicass=replicass, replicass=replicass,
......
...@@ -40,6 +40,23 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ ...@@ -40,6 +40,23 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_
else: else:
exp_id = 0 exp_id = 0
# Store metadata
separator = ","
lines = [
f"UC={uc}\n",
f"DIM_VALUES={separator.join(loads)}\n",
f"REPLICAS={separator.join(instances_list)}\n",
f"PARTITIONS={partitions}\n",
f"CPU_LIMIT={cpu_limit}\n",
f"MEMORY_LIMIT={memory_limit}\n",
f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n",
f"EXECUTION_MINUTES={duration}\n",
f"DOMAIN_RESTRICTION={domain_restriction}\n",
f"SEARCH_STRATEGY={search_strategy}"
]
with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
stream.writelines(lines)
with open("exp_counter.txt", mode="w") as write_stream: with open("exp_counter.txt", mode="w") as write_stream:
write_stream.write(str(exp_id + 1)) write_stream.write(str(exp_id + 1))
......
...@@ -6,6 +6,9 @@ kafka.input.topic=input ...@@ -6,6 +6,9 @@ kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
aggregation.duration.days=30 aggregation.duration.days=30
aggregation.advance.days=1 aggregation.advance.days=1
schema.registry.url=http://localhost:8091
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment