diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index 3950f12413745a6f3802a2e223123830f7d03649..5b78ef3653753a2b95ac9b74bf8de156a71fb14c 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -8,7 +8,7 @@ import csv import logging -def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'): +def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path): print("Main") time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) @@ -70,11 +70,11 @@ def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_ fields = [exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_] print(fields) - with open(r'results.csv', 'a') as f: + with open(f'{result_path}/results.csv', 'a') as f: writer = csv.writer(f) writer.writerow(fields) - filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}" + filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}" plt.plot(X, Y) plt.plot(X, Y_pred, color='red') @@ -163,4 +163,5 @@ if __name__ == '__main__': instances = sys.argv[4] execution_minutes = int(sys.argv[5]) - main(exp_id, benchmark, dim_value, instances, execution_minutes) + main(exp_id, benchmark, dim_value, instances, execution_minutes, + 'http://localhost:9090', 'results') diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py index 62f1c745cfa40c06dff48be160061da61e41459b..f785bce4f933622a99b4daaadeb483276d4956cd 100644 --- a/execution/lib/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -56,7 +56,7 @@ def default_parser(description): help='Only resets the environment. Ignores all other parameters') parser.add_argument('--prometheus', metavar='<URL>', - default=os.environ.get('PROMETHEUS_BASE_URL'), + default=os.environ.get('PROMETHEUS_BASE_URL', 'http://localhost:9090'), help='Defines where to find the prometheus instance') parser.add_argument('--path', metavar='<path>', diff --git a/execution/run_uc.py b/execution/run_uc.py index 0d7ca59ad23fac3c343cf2c6411716d7185cfcb5..6ebf797241b45a342214ea4dbd003e371f5bd828 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -248,7 +248,7 @@ def wait_execution(execution_minutes): return -def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None): +def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path): """ Runs the evaluation function :param string exp_id: ID of the experiment. @@ -258,12 +258,7 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) - elif prometheus_base_url is not None: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url) - else: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL')) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path) return @@ -460,7 +455,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): stop_lag_exporter() -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, reset_only, ns): +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False): """ Main method to execute one time the benchmark for a given use case. Start workload generator/application -> execute -> analyse -> stop all @@ -528,7 +523,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi print('---------------------') run_evaluation(exp_id, uc_id, dim_value, instances, - execution_minutes, prometheus_base_url) + execution_minutes, prometheus_base_url, result_path) print('---------------------') # Reset cluster regular, therefore abort exit not needed anymore @@ -543,4 +538,4 @@ if __name__ == '__main__': main(args.exp_id, args.uc, args.load, args.instances, args.partitions, args.cpu_limit, args.memory_limit, args.commit_ms, args.duration, args.prometheus, args.reset, - args.reset_only, args.namespace) + args.namespace, args.path, args.reset_only) diff --git a/execution/strategies/config.py b/execution/strategies/config.py index f9a67897286f79b06e5af06d9fb9b067228be33c..3741bcd5a8f025b0efc8bfb6ab53fdf08381ce9f 100644 --- a/execution/strategies/config.py +++ b/execution/strategies/config.py @@ -12,7 +12,11 @@ class ExperimentConfig: memory_limit: str kafka_streams_commit_interval_ms: int execution_minutes: int + prometheus_base_url: str + reset: bool + namespace: str + result_path: str domain_restriction_strategy: object search_strategy: object subexperiment_executor: object - subexperiment_evaluator: object \ No newline at end of file + subexperiment_evaluator: object diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py index 9d92831cd6ba03ad5b4ceeaf1b9741937396a4c2..3c6a15918ec8cf923b79e6f4f98564f983deac63 100644 --- a/execution/strategies/strategies/config.py +++ b/execution/strategies/strategies/config.py @@ -12,4 +12,8 @@ class SubexperimentConfig: cpu_limit: str memory_limit: str kafka_streams_commit_interval_ms: int - execution_minutes: int \ No newline at end of file + execution_minutes: int + prometheus_base_url: str + reset: bool + namespace: str + result_path: str diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py index 92eea2e7df4805b82b1b04ded909d68caa8c8b39..be7da54025c2f9fda1750d8197d3afd4055da790 100644 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig def binary_search(config, dim_value, lower, upper, subexperiment_counter): if lower == upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # successful, the upper neighbor is assumed to also has been successful @@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): return (lower+1, subexperiment_counter) elif lower+1==upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # minimal instances found return (lower, subexperiment_counter) else: # not successful, check if lower+1 instances are sufficient print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # minimal instances found @@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): # test mid mid=(upper+lower)//2 print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # success -> search in (lower, mid-1) @@ -44,4 +44,3 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c upper = len(config.replicass)-1 dim_value=config.dim_values[dim_value_index] return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter) - diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py index cd1a548d2142951a38ab04eba04ec6b0fb32e2a6..7d8ea605707131d19a023671a77b8f22647d6f51 100644 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py index eeda5ad32b22174ed3552180ee6307911e18b657..c4f57c0d9bd82467a5917bbf95fe330c7bd81a58 100644 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) @@ -19,4 +19,4 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c return (lower_replicas_bound_index, subexperiment_counter) else: lower_replicas_bound_index+=1 - return (lower_replicas_bound_index, subexperiment_counter) \ No newline at end of file + return (lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py index e7f0c91564057ac2bbdc64493e750c4c967476dd..4e46d2d6ccabb601d9df373a540d23e73d60be28 100644 --- a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py +++ b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py @@ -4,10 +4,10 @@ import os import lib.trend_slope_computer as trend_slope_computer THRESHOLD = 2000 -WARMUP_SEC = 60 +WARMUP_SEC = 60 def execute(config): - cwd = os.getcwd() + cwd = f'{os.getcwd()}/{config.result_path}' file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD) diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py index 7bcf182f5bee1fcc99d2a8c0040df208ae77bdb3..3f7af08b7a52d70609f000a34a47c088574ddfd6 100644 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py @@ -14,7 +14,7 @@ def execute(subexperiment_config): memory_limit=subexperiment_config.memory_limit, commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms, execution_minutes=int(subexperiment_config.execution_minutes), - prometheus_base_url=None, - reset=False, - reset_only=False, - ns="default") + prometheus_base_url=subexperiment_config.prometheus_base_url, + reset=subexperiment_config.reset, + ns=subexperiment_config.namespace, + result_path=subexperiment_config.result_path) diff --git a/execution/theodolite.py b/execution/theodolite.py index 56fdc8b52597535425d09f44b4de50f836f1a9a0..22be2f69ab81d81b7aac7717041604cd368e771f 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -31,7 +31,7 @@ def load_variables(): def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ms, duration, domain_restriction, search_strategy, - prometheus_base_url ,reset, reset_only, namespace, result_path): + prometheus_base_url, reset, namespace, result_path): print(f"Domain restriction of search space activated: {domain_restriction}") print(f"Chosen search strategy: {search_strategy}") @@ -97,6 +97,10 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, memory_limit=memory_limit, kafka_streams_commit_interval_ms=commit_ms, execution_minutes=duration, + prometheus_base_url=prometheus_base_url, + reset=reset, + namespace=namespace, + result_path=result_path, domain_restriction_strategy=domain_restriction_strategy, search_strategy=search_strategy, subexperiment_executor=subexperiment_executor, @@ -112,4 +116,4 @@ if __name__ == '__main__': main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, args.memory_limit, args.commit_ms, args.duration, args.domain_restriction, args.search_strategy, args.prometheus, - args.reset, args.reset_only, args.namespace, args.path) + args.reset, args.namespace, args.path)