diff --git a/.gitignore b/.gitignore index 36f08fd69890d38fafe2b8bbf40d830773e737e0..71305e60a1056e58f281da4c2ab397539b63ba52 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ tmp/ # Python Venv .venv + +# Python cache files +*.pyc diff --git a/execution/README.md b/execution/README.md index a56f249a3988273d0b3aafc0a023077328249a87..7a71cf0d73d6dd737c181dc138a1dd4a7fb0dc30 100644 --- a/execution/README.md +++ b/execution/README.md @@ -150,17 +150,37 @@ Depending on your setup, some additional adjustments may be necessary: ## Execution -The `./theodolite.sh` is the entrypoint for all benchmark executions. Is has to be called as follows: +Please note that a **Python 3.7** installation is required for executing Theodolite. + +The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows: ```sh -./theodolite.sh <use-case> <wl-values> <instances> <partitions> <cpu-limit> <memory-limit> <commit-interval> <duration> +./theodolite.sh <use-case> <wl-values> <instances> <partitions> <cpu-limit> <memory-limit> <commit-interval> <duration> <domain-restriction> <search-strategy> ``` * `<use-case>`: Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`. -* `<wl-values>`: Values for the workload generator to be tested, separated by commas and quoted. For example `"100000, 200000, 300000"`. -* `<instances>`: Numbers of instances to be benchmarked, separated by commas and quoted. For example `"1, 2, 3, 4"`. +* `<wl-values>`: Values for the workload generator to be tested, separated by commas and sorted in ascending order. For example `100000,200000,300000`. +* `<instances>`: Numbers of instances to be benchmarked, separated by commas and sorted in ascending order. For example `1,2,3,4`. * `<partitions>`: Number of partitions for Kafka topics. Optional. Default `40`. * `<cpu-limit>`: Kubernetes CPU limit. Optional. Default `1000m`. * `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`. * `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`. * `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`. +* `<domain-restriction>`: The domain restriction: `domain-restriction` to use domain restriction `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_. +* `<search-strategy>`: The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. Default `check-all`. For more details see Section _Benchmarking Search Strategies_. + +### Domain Restriction +For dimension value, we have a domain of the amounts of instances. As a consequence, for each dimension value the maximum number of lag experiments is equal to the size of the domain. How the domain is determined is defined by the following domain restriction strategies. + +* `no-domain-restriction`: For each dimension value, the domain of instances is equal to the set of all amounts of instances. +* `domain-restriction`: For each dimension value, the domain is computed as follows: + * If the dimension value is the smallest dimension value the domain of the amounts of instances is equal to the set of all amounts of instances. + * If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N. + +### Benchmarking Search Strategies +There are the following benchmarking strategies: + +* `check-all`: For each dimension value, execute one lag experiment for all amounts of instances within the current domain. +* `linear-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is from the lowest number of instances to the highest amount of instances and the execution for each dimension value is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful. +* `binary-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is in a binary-search-like manner. The execution is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful. + diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index 23e3d5f6c9552814f5301cd81e517f49d044cd33..2ae3c593bdebfbcd41393154d07faec171d9243a 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone import pandas as pd import matplotlib.pyplot as plt import csv - # exp_id = sys.argv[1] benchmark = sys.argv[2] @@ -14,6 +13,8 @@ instances = sys.argv[4] execution_minutes = int(sys.argv[5]) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) +prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range' + #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) @@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes) #print(start.isoformat().replace('+00:00', 'Z')) #print(end.isoformat().replace('+00:00', 'Z')) -response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ +response = requests.get(prometheus_query_path, params={ #'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", 'start': start.isoformat(), @@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv") # Load total lag count -response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ +response = requests.get(prometheus_query_path, params={ 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", 'start': start.isoformat(), 'end': end.isoformat(), @@ -108,10 +109,9 @@ df = pd.DataFrame(d) df.to_csv(f"{filename}_totallag.csv") - # Load partition count -response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ +response = requests.get(prometheus_query_path, params={ 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", 'start': start.isoformat(), 'end': end.isoformat(), @@ -135,7 +135,7 @@ df.to_csv(f"{filename}_partitions.csv") # Load instances count -response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ +response = requests.get(prometheus_query_path, params={ 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", 'start': start.isoformat(), 'end': end.isoformat(), diff --git a/execution/lib/__init__.py b/execution/lib/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/execution/lib/trend_slope_computer.py b/execution/lib/trend_slope_computer.py new file mode 100644 index 0000000000000000000000000000000000000000..294226c35c0038a01804f7f5e8eb3a1e53c79b79 --- /dev/null +++ b/execution/lib/trend_slope_computer.py @@ -0,0 +1,19 @@ +from sklearn.linear_model import LinearRegression +import pandas as pd +import os + +def compute(directory, filename, warmup_sec, threshold): + df = pd.read_csv(os.path.join(directory, filename)) + input = df + input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] + regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up + + X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array + Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column + linear_regressor = LinearRegression() # create object for the class + linear_regressor.fit(X, Y) # perform linear regression + Y_pred = linear_regressor.predict(X) # make predictions + + trend_slope = linear_regressor.coef_[0][0] + + return trend_slope \ No newline at end of file diff --git a/execution/strategies/__init__.py b/execution/strategies/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/execution/strategies/config.py b/execution/strategies/config.py new file mode 100644 index 0000000000000000000000000000000000000000..f9a67897286f79b06e5af06d9fb9b067228be33c --- /dev/null +++ b/execution/strategies/config.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass + +@dataclass +class ExperimentConfig: + """ Wrapper for the configuration of an experiment. """ + use_case: str + exp_id: int + dim_values: list + replicass: list + partitions: int + cpu_limit: str + memory_limit: str + kafka_streams_commit_interval_ms: int + execution_minutes: int + domain_restriction_strategy: object + search_strategy: object + subexperiment_executor: object + subexperiment_evaluator: object \ No newline at end of file diff --git a/execution/strategies/experiment_execution.py b/execution/strategies/experiment_execution.py new file mode 100644 index 0000000000000000000000000000000000000000..c2ee18f9b79a6e880dbcb69b47061cc5ecc6b9ba --- /dev/null +++ b/execution/strategies/experiment_execution.py @@ -0,0 +1,6 @@ +class ExperimentExecutor: + def __init__(self, config): + self.config=config + + def execute(self): + self.config.domain_restriction_strategy.execute(self.config) diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py new file mode 100644 index 0000000000000000000000000000000000000000..9d92831cd6ba03ad5b4ceeaf1b9741937396a4c2 --- /dev/null +++ b/execution/strategies/strategies/config.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass + +@dataclass +class SubexperimentConfig: + """ Wrapper for the configuration of a subexperiment """ + use_case: str + exp_id: int + counter: int + dim_value: int + replicas: int + partitions: int + cpu_limit: str + memory_limit: str + kafka_streams_commit_interval_ms: int + execution_minutes: int \ No newline at end of file diff --git a/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..b218731fc76d83347b4dbf10448f01615d378c0b --- /dev/null +++ b/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py @@ -0,0 +1,12 @@ +# The lower bound strategy +def execute(config): + dim_value_index = 0 + lower_bound_replicas_index = 0 + subexperiment_counter = 0 + while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass): + lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute( + config=config, + dim_value_index=dim_value_index, + lower_replicas_bound_index=lower_bound_replicas_index, + subexperiment_counter=subexperiment_counter) + dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..e5dea56118460b0dfdc6b1c36ce2587b6752512b --- /dev/null +++ b/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py @@ -0,0 +1,11 @@ +# The strategy where the domain contains all amounts of instances +def execute(config): + dim_value_index = 0 + subexperiment_counter = 0 + while dim_value_index < len(config.dim_values): + _, subexperiment_counter = config.search_strategy.execute( + config=config, + dim_value_index=dim_value_index, + lower_replicas_bound_index=0, + subexperiment_counter=subexperiment_counter) + dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..92eea2e7df4805b82b1b04ded909d68caa8c8b39 --- /dev/null +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -0,0 +1,47 @@ +# The binary search strategy +import os +from strategies.strategies.config import SubexperimentConfig + +def binary_search(config, dim_value, lower, upper, subexperiment_counter): + if lower == upper: + print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + config.subexperiment_executor.execute(subexperiment_config) + result = config.subexperiment_evaluator.execute(subexperiment_config) + if result==1: # successful, the upper neighbor is assumed to also has been successful + return (lower, subexperiment_counter+1) + else: # not successful + return (lower+1, subexperiment_counter) + elif lower+1==upper: + print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + config.subexperiment_executor.execute(subexperiment_config) + result = config.subexperiment_evaluator.execute(subexperiment_config) + if result==1: # minimal instances found + return (lower, subexperiment_counter) + else: # not successful, check if lower+1 instances are sufficient + print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + config.subexperiment_executor.execute(subexperiment_config) + result = config.subexperiment_evaluator.execute(subexperiment_config) + if result == 1: # minimal instances found + return (upper, subexperiment_counter) + else: + return (upper+1, subexperiment_counter) + else: + # test mid + mid=(upper+lower)//2 + print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + config.subexperiment_executor.execute(subexperiment_config) + result = config.subexperiment_evaluator.execute(subexperiment_config) + if result == 1: # success -> search in (lower, mid-1) + return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1) + else: # not success -> search in (mid+1, upper) + return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1) + +def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): + upper = len(config.replicass)-1 + dim_value=config.dim_values[dim_value_index] + return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter) + diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..cd1a548d2142951a38ab04eba04ec6b0fb32e2a6 --- /dev/null +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -0,0 +1,24 @@ +# The check_all strategy +import os +from strategies.strategies.config import SubexperimentConfig + +def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): + new_lower_replicas_bound_index=lower_replicas_bound_index + new_lower_replicas_bound_found=False + subexperiments_total=len(config.dim_values)*len(config.replicass) + while lower_replicas_bound_index < len(config.replicass): + subexperiment_counter+=1 + dim_value=config.dim_values[dim_value_index] + replicas=config.replicass[lower_replicas_bound_index] + print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") + + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + + config.subexperiment_executor.execute(subexperiment_config) + + result = config.subexperiment_evaluator.execute(subexperiment_config) == 1 + if result == 1 and not new_lower_replicas_bound_found: + new_lower_replicas_bound_found = True + new_lower_replicas_bound_index = lower_replicas_bound_index + lower_replicas_bound_index+=1 + return (new_lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..eeda5ad32b22174ed3552180ee6307911e18b657 --- /dev/null +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -0,0 +1,22 @@ +# The linear-search strategy + +import os +from strategies.strategies.config import SubexperimentConfig + +def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): + subexperiments_total=len(config.dim_values)+len(config.replicass)-1 + dim_value=config.dim_values[dim_value_index] + while lower_replicas_bound_index < len(config.replicass): + subexperiment_counter+=1 + replicas=config.replicass[lower_replicas_bound_index] + print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") + + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + + config.subexperiment_executor.execute(subexperiment_config) + result = config.subexperiment_evaluator.execute(subexperiment_config) + if result == 1: + return (lower_replicas_bound_index, subexperiment_counter) + else: + lower_replicas_bound_index+=1 + return (lower_replicas_bound_index, subexperiment_counter) \ No newline at end of file diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py new file mode 100644 index 0000000000000000000000000000000000000000..e7f0c91564057ac2bbdc64493e750c4c967476dd --- /dev/null +++ b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py @@ -0,0 +1,17 @@ +import os +import sys +import os +import lib.trend_slope_computer as trend_slope_computer + +THRESHOLD = 2000 +WARMUP_SEC = 60 + +def execute(config): + cwd = os.getcwd() + file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" + + trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD) + + print(f"Trend Slope: {trend_slope}") + success = 0 if trend_slope > THRESHOLD else 1 + return success diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py new file mode 100644 index 0000000000000000000000000000000000000000..e31e3d22ea18ffc4d5c130c56a7a2f99bf704965 --- /dev/null +++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py @@ -0,0 +1,8 @@ +# Wrapper that makes the execution method of a subexperiment interchangable. + +import os + +dirname = os.path.dirname(__file__) +os.chdir(dirname+"/../../") +def execute(subexperiment_config): + os.system(f"./run_uc{subexperiment_config.use_case}.sh {subexperiment_config.exp_id} {subexperiment_config.dim_value} {subexperiment_config.replicas} {subexperiment_config.partitions} {subexperiment_config.cpu_limit} {subexperiment_config.memory_limit} {subexperiment_config.kafka_streams_commit_interval_ms} {subexperiment_config.execution_minutes}") \ No newline at end of file diff --git a/execution/strategies/tests/.gitignore b/execution/strategies/tests/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..1998c294f84ec0ff4b32396e4cd8e74e352672e6 --- /dev/null +++ b/execution/strategies/tests/.gitignore @@ -0,0 +1 @@ +.cache \ No newline at end of file diff --git a/execution/strategies/tests/__init__.py b/execution/strategies/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..ed727ad607c99def55f429e97a0e66cf0eb72397 --- /dev/null +++ b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py @@ -0,0 +1,104 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.binary_search_strategy as binary_search_strategy +import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +expected_order = [ + (0,3), # workload dim 0 + (0,1), + (0,0), + (1,3), # workload dim 1 + (1,1), + (1,2), + (2,4), # workload dim 2 + (2,2), + (3,4), # workload dim 3 + (3,2), + (3,3), + (4,4), # workload dim 4 + (4,3), + (5,5), # workload dim 5 + (5,6), + (6,6) # workload dim 6 + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(last_experiment) + print("Index was expected to be:") + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_binary_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..33c32944b82d095e1b247a7bb7e84fe702c3f147 --- /dev/null +++ b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py @@ -0,0 +1,119 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.check_all_strategy as check_all_strategy +import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +# the expected order of executed experiments +expected_order = [ + (0,0), # workload dim 0 + (0,1), + (0,2), + (0,3), + (0,4), + (0,5), + (0,6), + (1,0), # workload dim 1 + (1,1), + (1,2), + (1,3), + (1,4), + (1,5), + (1,6), + (2,2), # workload dim 2 + (2,3), + (2,4), + (2,5), + (2,6), + (3,2), # workload dim 3 + (3,3), + (3,4), + (3,5), + (3,6), + (4,3), # workload dim 4 + (4,4), + (4,5), + (4,6), + (5,4), # workload dim 3 + (5,5), + (5,6), + (6,6) # workload dim 6 + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_linear_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..9188b471949b2d0a505337a0b401df3f30da2763 --- /dev/null +++ b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py @@ -0,0 +1,100 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.linear_search_strategy as linear_search_strategy +import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +# the expected order of executed experiments +expected_order = [ + (0,0), + (1,0), + (1,1), + (1,2), + (2,2), + (3,2), + (3,3), + (4,3), + (4,4), + (5,4), + (5,5), + (5,6), + (6,6) + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_linear_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..52ad2e0e7fc88038a2aa276ac967ccdd482d2e85 --- /dev/null +++ b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py @@ -0,0 +1,109 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.binary_search_strategy as binary_search_strategy +import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +expected_order = [ + (0,3), # workload dim 0 + (0,1), + (0,0), + (1,3), # workload dim 1 + (1,1), + (1,2), + (2,3), # workload dim 2 + (2,1), + (2,2), + (3,3), # workload dim 3 + (3,1), + (3,2), + (4,3), # workload dim 4 + (4,5), + (4,4), + (5,3), # workload dim 5 + (5,5), + (5,6), + (6,3), # workload dim 6 + (6,5), + (6,6) + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(last_experiment) + print("Index was expected to be:") + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_binary_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_check_all_strategy.py b/execution/strategies/tests/test_no_restriction_check_all_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..6f0a0eed500bd925166846ccdbfcda0a7d1c1095 --- /dev/null +++ b/execution/strategies/tests/test_no_restriction_check_all_strategy.py @@ -0,0 +1,136 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.check_all_strategy as check_all_strategy +import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +# the expected order of executed experiments +expected_order = [ + (0,0), # workload dim 0 + (0,1), + (0,2), + (0,3), + (0,4), + (0,5), + (0,6), + (1,0), # workload dim 1 + (1,1), + (1,2), + (1,3), + (1,4), + (1,5), + (1,6), + (2,0), # workload dim 2 + (2,1), + (2,2), + (2,3), + (2,4), + (2,5), + (2,6), + (3,0), # workload dim 4 + (3,1), + (3,2), + (3,3), + (3,4), + (3,5), + (3,6), + (4,0), # workload dim 4 + (4,1), + (4,2), + (4,3), + (4,4), + (4,5), + (4,6), + (5,0), # workload dim 5 + (5,1), + (5,2), + (5,3), + (5,4), + (5,5), + (5,6), + (6,0), # workload dim 6 + (6,1), + (6,2), + (6,3), + (6,4), + (6,5), + (6,6), + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_linear_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..a7fd68b9b5b0c99ea6cd443c889d925d357b22cf --- /dev/null +++ b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py @@ -0,0 +1,117 @@ +import pprint + +from strategies.config import ExperimentConfig +import strategies.strategies.search.linear_search_strategy as linear_search_strategy +import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor + +class Object(object): + pass + +pp = pprint.PrettyPrinter(indent=4) + +dim_values = [0, 1, 2, 3, 4, 5, 6] +replicass = [0, 1, 2, 3, 4, 5, 6] + +# True means the experiment was successful +# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. +# this means the first row starts with (0,0), the second row with (1, 0) etc. +successful = [ + [ True , True , True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, True , True , True , True , True ], + [ False, False, False, True , True , True , True ], + [ False, False, False, False, True , True , True ], + [ False, False, False, False, False, False, True ], + [ False, False, False, False, False, False, False ] + ] + +# the expected order of executed experiments +expected_order = [ + (0,0), # workload dim 0 + (1,0), # workload dim 1 + (1,1), + (1,2), + (2,0), # workload dim 2 + (2,1), + (2,2), + (3,0), # workload dim 3 + (3,1), + (3,2), + (3,3), + (4,0), # workload dim 4 + (4,1), + (4,2), + (4,3), + (4,4), + (5,0), # workload dim 5 + (5,1), + (5,2), + (5,3), + (5,4), + (5,5), + (5,6), + (6,0), # workload dim 6 + (6,1), + (6,2), + (6,3), + (6,4), + (6,5), + (6,6) + ] + +last_experiment = (0, 0) +experiment_counter = -1 +subexperiment_executor = Object() + +def subexperiment_executor_executor(config): + global experiment_counter, last_experiment, pp + print("Simulate subexperiment with config:") + pp.pprint(config) + last_experiment = (config.dim_value, config.replicas) + experiment_counter += 1 + print("Simulation complete") + +subexperiment_executor.execute = subexperiment_executor_executor + + +# returns True if the experiment was successful + +subexperiment_evaluator = Object() + +def subexperiment_evaluator_execute(i): + print("Evaluating last experiment. Index was:") + global expected_order, experiment_counter, last_experiment, successful + pp.pprint(expected_order[experiment_counter]) + assert expected_order[experiment_counter] == last_experiment + print("Index was as expected. Evaluation finished.") + return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 + +subexperiment_evaluator.execute = subexperiment_evaluator_execute + +def test_linear_search_strategy(): + # declare parameters + uc="test-uc" + partitions=40 + cpu_limit="1000m" + memory_limit="4Gi" + kafka_streams_commit_interval_ms=100 + execution_minutes=5 + + # execute + experiment_config = ExperimentConfig( + use_case=uc, + dim_values=dim_values, + replicass=replicass, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + executor = ExperimentExecutor(experiment_config) + executor.execute() \ No newline at end of file diff --git a/execution/theodolite.py b/execution/theodolite.py new file mode 100755 index 0000000000000000000000000000000000000000..6049f3ffac61af46212f521a91870e57342be478 --- /dev/null +++ b/execution/theodolite.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +import sys +import os +from strategies.config import ExperimentConfig +import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy +import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy +import strategies.strategies.search.check_all_strategy as check_all_strategy +import strategies.strategies.search.linear_search_strategy as linear_search_strategy +import strategies.strategies.search.binary_search_strategy as binary_search_strategy +from strategies.experiment_execution import ExperimentExecutor +import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor +import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator + +uc=sys.argv[1] +dim_values=sys.argv[2].split(',') +replicas=sys.argv[3].split(',') +partitions=sys.argv[4] if len(sys.argv) >= 5 and sys.argv[4] else 40 +cpu_limit=sys.argv[5] if len(sys.argv) >= 6 and sys.argv[5] else "1000m" +memory_limit=sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else "4Gi" +kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[7] else 100 +execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5 +domain_restriction=bool(sys.argv[9]) if len(sys.argv) >= 10 and sys.argv[9] == "restrict-domain" else False +search_strategy=sys.argv[10] if len(sys.argv) >= 11 and (sys.argv[10] == "linear-search" or sys.argv[10] == "binary-search") else "default" + +print(f"Domain restriction of search space activated: {domain_restriction}") +print(f"Chosen search strategy: {search_strategy}") + +if os.path.exists("exp_counter.txt"): + with open("exp_counter.txt", mode="r") as read_stream: + exp_id = int(read_stream.read()) +else: + exp_id = 0 + +with open("exp_counter.txt", mode="w") as write_stream: + write_stream.write(str(exp_id+1)) + +# domain restriction +if domain_restriction: + # domain restriction + linear-search + if search_strategy == "linear-search": + print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # domain restriction + binary-search + elif search_strategy == "binary-search": + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # domain restriction + check-all + else: + print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) +# no domain restriction +else: + # no domain restriction + linear-search + if search_strategy == "linear-search": + print(f"Going to execute at most {len(dim_values)*len(replicas)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + binary-search + elif search_strategy == "binary-search": + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + check-all + else: + print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=dim_values, + replicass=replicas, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, + execution_minutes=execution_minutes, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + +executor = ExperimentExecutor(experiment_config) +executor.execute() \ No newline at end of file diff --git a/execution/theodolite.sh b/execution/theodolite.sh deleted file mode 100755 index 18a6b67a9c321cd1c0ecebca405169ec5b8ade46..0000000000000000000000000000000000000000 --- a/execution/theodolite.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash - -UC=$1 -IFS=', ' read -r -a DIM_VALUES <<< "$2" -IFS=', ' read -r -a REPLICAS <<< "$3" -PARTITIONS=${4:-40} -CPU_LIMIT=${5:-1000m} -MEMORY_LIMIT=${6:-4Gi} -KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100} -EXECUTION_MINUTES=${8:-5} - -# Get and increment counter -EXP_ID=$(cat exp_counter.txt 2>/dev/null || echo "0") -echo $((EXP_ID+1)) > exp_counter.txt - -# Store meta information -IFS=$', '; echo \ -"UC=$UC -DIM_VALUES=${DIM_VALUES[*]} -REPLICAS=${REPLICAS[*]} -PARTITIONS=$PARTITIONS -CPU_LIMIT=$CPU_LIMIT -MEMORY_LIMIT=$MEMORY_LIMIT -KAFKA_STREAMS_COMMIT_INTERVAL_MS=$KAFKA_STREAMS_COMMIT_INTERVAL_MS -EXECUTION_MINUTES=$EXECUTION_MINUTES -" >> "exp${EXP_ID}_uc${UC}_meta.txt" - -SUBEXPERIMENTS=$((${#DIM_VALUES[@]} * ${#REPLICAS[@]})) -SUBEXPERIMENT_COUNTER=0 - -echo "Going to execute $SUBEXPERIMENTS subexperiments in total..." -for DIM_VALUE in "${DIM_VALUES[@]}" -do - for REPLICA in "${REPLICAS[@]}" - do - SUBEXPERIMENT_COUNTER=$((SUBEXPERIMENT_COUNTER+1)) - echo "Run subexperiment $SUBEXPERIMENT_COUNTER/$SUBEXPERIMENTS with config: $DIM_VALUE $REPLICA" - ./run_uc$UC.sh $EXP_ID $DIM_VALUE $REPLICA $PARTITIONS $CPU_LIMIT $MEMORY_LIMIT $KAFKA_STREAMS_COMMIT_INTERVAL_MS $EXECUTION_MINUTES - sleep 10s - done -done