diff --git a/execution/.gitignore b/execution/.gitignore index d4dceff0274cd6ab3296e85e995f7e5d504f114d..bac9a5d1eeb12d9e40d38376904e8fb69c0e5231 100644 --- a/execution/.gitignore +++ b/execution/.gitignore @@ -1 +1,2 @@ -exp_counter.txt \ No newline at end of file +exp_counter.txt +results diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index 3950f12413745a6f3802a2e223123830f7d03649..5b78ef3653753a2b95ac9b74bf8de156a71fb14c 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -8,7 +8,7 @@ import csv import logging -def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'): +def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path): print("Main") time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) @@ -70,11 +70,11 @@ def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_ fields = [exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_] print(fields) - with open(r'results.csv', 'a') as f: + with open(f'{result_path}/results.csv', 'a') as f: writer = csv.writer(f) writer.writerow(fields) - filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}" + filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}" plt.plot(X, Y) plt.plot(X, Y_pred, color='red') @@ -163,4 +163,5 @@ if __name__ == '__main__': instances = sys.argv[4] execution_minutes = int(sys.argv[5]) - main(exp_id, benchmark, dim_value, instances, execution_minutes) + main(exp_id, benchmark, dim_value, instances, execution_minutes, + 'http://localhost:9090', 'results') diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py index 0b0a7438910560f5b5871b0023c92d6743dd6cc9..f785bce4f933622a99b4daaadeb483276d4956cd 100644 --- a/execution/lib/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -56,8 +56,12 @@ def default_parser(description): help='Only resets the environment. Ignores all other parameters') parser.add_argument('--prometheus', metavar='<URL>', - default=os.environ.get('PROMETHEUS_BASE_URL'), + default=os.environ.get('PROMETHEUS_BASE_URL', 'http://localhost:9090'), help='Defines where to find the prometheus instance') + parser.add_argument('--path', + metavar='<path>', + default=os.environ.get('RESULT_PATH', 'results'), + help='A directory path for the results') return parser def benchmark_parser(description): diff --git a/execution/run_uc.py b/execution/run_uc.py index 0d7ca59ad23fac3c343cf2c6411716d7185cfcb5..6ebf797241b45a342214ea4dbd003e371f5bd828 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -248,7 +248,7 @@ def wait_execution(execution_minutes): return -def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None): +def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path): """ Runs the evaluation function :param string exp_id: ID of the experiment. @@ -258,12 +258,7 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) - elif prometheus_base_url is not None: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url) - else: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL')) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path) return @@ -460,7 +455,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): stop_lag_exporter() -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, reset_only, ns): +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False): """ Main method to execute one time the benchmark for a given use case. Start workload generator/application -> execute -> analyse -> stop all @@ -528,7 +523,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi print('---------------------') run_evaluation(exp_id, uc_id, dim_value, instances, - execution_minutes, prometheus_base_url) + execution_minutes, prometheus_base_url, result_path) print('---------------------') # Reset cluster regular, therefore abort exit not needed anymore @@ -543,4 +538,4 @@ if __name__ == '__main__': main(args.exp_id, args.uc, args.load, args.instances, args.partitions, args.cpu_limit, args.memory_limit, args.commit_ms, args.duration, args.prometheus, args.reset, - args.reset_only, args.namespace) + args.namespace, args.path, args.reset_only) diff --git a/execution/strategies/config.py b/execution/strategies/config.py index f9a67897286f79b06e5af06d9fb9b067228be33c..3741bcd5a8f025b0efc8bfb6ab53fdf08381ce9f 100644 --- a/execution/strategies/config.py +++ b/execution/strategies/config.py @@ -12,7 +12,11 @@ class ExperimentConfig: memory_limit: str kafka_streams_commit_interval_ms: int execution_minutes: int + prometheus_base_url: str + reset: bool + namespace: str + result_path: str domain_restriction_strategy: object search_strategy: object subexperiment_executor: object - subexperiment_evaluator: object \ No newline at end of file + subexperiment_evaluator: object diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py index 9d92831cd6ba03ad5b4ceeaf1b9741937396a4c2..3c6a15918ec8cf923b79e6f4f98564f983deac63 100644 --- a/execution/strategies/strategies/config.py +++ b/execution/strategies/strategies/config.py @@ -12,4 +12,8 @@ class SubexperimentConfig: cpu_limit: str memory_limit: str kafka_streams_commit_interval_ms: int - execution_minutes: int \ No newline at end of file + execution_minutes: int + prometheus_base_url: str + reset: bool + namespace: str + result_path: str diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py index 92eea2e7df4805b82b1b04ded909d68caa8c8b39..be7da54025c2f9fda1750d8197d3afd4055da790 100644 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig def binary_search(config, dim_value, lower, upper, subexperiment_counter): if lower == upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # successful, the upper neighbor is assumed to also has been successful @@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): return (lower+1, subexperiment_counter) elif lower+1==upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # minimal instances found return (lower, subexperiment_counter) else: # not successful, check if lower+1 instances are sufficient print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # minimal instances found @@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): # test mid mid=(upper+lower)//2 print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # success -> search in (lower, mid-1) @@ -44,4 +44,3 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c upper = len(config.replicass)-1 dim_value=config.dim_values[dim_value_index] return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter) - diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py index cd1a548d2142951a38ab04eba04ec6b0fb32e2a6..7d8ea605707131d19a023671a77b8f22647d6f51 100644 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py index eeda5ad32b22174ed3552180ee6307911e18b657..c4f57c0d9bd82467a5917bbf95fe330c7bd81a58 100644 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) @@ -19,4 +19,4 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c return (lower_replicas_bound_index, subexperiment_counter) else: lower_replicas_bound_index+=1 - return (lower_replicas_bound_index, subexperiment_counter) \ No newline at end of file + return (lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py index e7f0c91564057ac2bbdc64493e750c4c967476dd..4e46d2d6ccabb601d9df373a540d23e73d60be28 100644 --- a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py +++ b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py @@ -4,10 +4,10 @@ import os import lib.trend_slope_computer as trend_slope_computer THRESHOLD = 2000 -WARMUP_SEC = 60 +WARMUP_SEC = 60 def execute(config): - cwd = os.getcwd() + cwd = f'{os.getcwd()}/{config.result_path}' file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD) diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py index 7bcf182f5bee1fcc99d2a8c0040df208ae77bdb3..3f7af08b7a52d70609f000a34a47c088574ddfd6 100644 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py @@ -14,7 +14,7 @@ def execute(subexperiment_config): memory_limit=subexperiment_config.memory_limit, commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms, execution_minutes=int(subexperiment_config.execution_minutes), - prometheus_base_url=None, - reset=False, - reset_only=False, - ns="default") + prometheus_base_url=subexperiment_config.prometheus_base_url, + reset=subexperiment_config.reset, + ns=subexperiment_config.namespace, + result_path=subexperiment_config.result_path) diff --git a/execution/theodolite.py b/execution/theodolite.py index 3c0506355aff373795adb762e0b66ec64456c5df..22be2f69ab81d81b7aac7717041604cd368e771f 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -31,16 +31,20 @@ def load_variables(): def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ms, duration, domain_restriction, search_strategy, - prometheus_base_url ,reset, reset_only, namespace): + prometheus_base_url, reset, namespace, result_path): print(f"Domain restriction of search space activated: {domain_restriction}") print(f"Chosen search strategy: {search_strategy}") - if os.path.exists("exp_counter.txt"): - with open("exp_counter.txt", mode="r") as read_stream: + counter_path = f"{result_path}/exp_counter.txt" + + if os.path.exists(counter_path): + with open(counter_path, mode="r") as read_stream: exp_id = int(read_stream.read()) else: exp_id = 0 + # Create the directory if not exists + os.makedirs(result_path, exist_ok=True) # Store metadata separator = "," @@ -56,116 +60,51 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, f"DOMAIN_RESTRICTION={domain_restriction}\n", f"SEARCH_STRATEGY={search_strategy}" ] - with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream: + with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream: stream.writelines(lines) - with open("exp_counter.txt", mode="w") as write_stream: + with open(counter_path, mode="w") as write_stream: write_stream.write(str(exp_id + 1)) - # domain restriction + domain_restriction_strategy = None + search_strategy = None + + # Select domain restriction if domain_restriction: - # domain restriction + linear-search - if search_strategy == "linear-search": - print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # domain restriction + binary-search - elif search_strategy == "binary-search": - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # domain restriction + check_all - else: - print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # no domain restriction + # domain restriction + domain_restriction_strategy = lower_bound_strategy else: - # no domain restriction + linear-search - if search_strategy == "linear-search": - print(f"Going to execute at most {len(loads)*len(instances_list)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # no domain restriction + binary-search - elif search_strategy == "binary-search": - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # no domain restriction + check_all - else: - print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, - execution_minutes=duration, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + domain_restriction_strategy = no_lower_bound_strategy + + # select search strategy + if search_strategy == "linear-search": + print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..") + search_strategy = linear_search_strategy + elif search_strategy == "binary-search": + search_strategy = binary_search_strategy + else: + print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") + search_strategy = check_all_strategy + + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + prometheus_base_url=prometheus_base_url, + reset=reset, + namespace=namespace, + result_path=result_path, + domain_restriction_strategy=domain_restriction_strategy, + search_strategy=search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) executor = ExperimentExecutor(experiment_config) executor.execute() @@ -177,4 +116,4 @@ if __name__ == '__main__': main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, args.memory_limit, args.commit_ms, args.duration, args.domain_restriction, args.search_strategy, args.prometheus, - args.reset, args.reset_only, args.namespace) + args.reset, args.namespace, args.path)