diff --git a/execution/run_uc.py b/execution/run_uc.py index 81cda8881319c1ed60ded02c6099c3de06f40804..3a21e23e6971fd0df6a19dd2a9d32cbf83e9af9b 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -5,6 +5,7 @@ from kubernetes.stream import stream import lag_analysis import logging # logging from os import path # path utilities +from strategies.cli_parser import execution_parser import subprocess # execute bash commands import sys # for exit of program import time # process sleep @@ -17,71 +18,14 @@ customApi = None # acces kubernetes custom object api def load_variables(): """Load the CLI variables given at the command line""" - global args print('Load CLI variables') - parser = argparse.ArgumentParser(description='Run use case Programm') - parser.add_argument('--exp-id', '-id', - dest='exp_id', - default='1', - metavar='EXP_ID', - help='ID of the experiment') - parser.add_argument('--use-case', '-uc', - dest='uc_id', - default='1', - metavar='UC_NUMBER', - help='use case number, one of 1, 2, 3 or 4') - parser.add_argument('--dim-value', '-d', - dest='dim_value', - default=10000, - type=int, - metavar='DIM_VALUE', - help='Value for the workload generator to be tested') - parser.add_argument('--instances', '-i', - dest='instances', - default=1, - type=int, - metavar='INSTANCES', - help='Numbers of instances to be benchmarked') - parser.add_argument('--partitions', '-p', - dest='partitions', - default=40, - type=int, - metavar='PARTITIONS', - help='Number of partitions for Kafka topics') - parser.add_argument('--cpu-limit', '-cpu', - dest='cpu_limit', - default='1000m', - metavar='CPU_LIMIT', - help='Kubernetes CPU limit') - parser.add_argument('--memory-limit', '-mem', - dest='memory_limit', - default='4Gi', - metavar='MEMORY_LIMIT', - help='Kubernetes memory limit') - parser.add_argument('--commit-interval', '-ci', - dest='commit_interval_ms', - default=100, - type=int, - metavar='KAFKA_STREAMS_COMMIT_INTERVAL_MS', - help='Kafka Streams commit interval in milliseconds') - parser.add_argument('--executions-minutes', '-exm', - dest='execution_minutes', - default=5, - type=int, - metavar='EXECUTION_MINUTES', - help='Duration in minutes subexperiments should be \ - executed for') - parser.add_argument('--reset', '-res', - dest='reset', - action="store_true", - help='Resets the environment before execution') - parser.add_argument('--reset-only', '-reso', - dest='reset_only', - action="store_true", - help='Only resets the environment. Ignores all other parameters') - + parser = execution_parser(description='Run use case Programm') args = parser.parse_args() print(args) + if args.exp_id is None or args.uc is None or args.load is None or args.instances is None : + print('The options --exp-id, --uc, --load and --instances are mandatory.') + print('Some might not be set!') + sys.exit(1) return args @@ -583,7 +527,7 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) args = load_variables() print('---------------------') - main(args.exp_id, args.uc_id, args.dim_value, args.instances, + main(args.exp_id, args.uc, args.load, args.instances, args.partitions, args.cpu_limit, args.memory_limit, - args.commit_interval_ms, args.execution_minutes, args.reset, + args.commit_ms, args.duration, args.reset, args.reset_only) diff --git a/execution/strategies/cli_parser.py b/execution/strategies/cli_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..0b16b6ba5b5c569ed48b319ce068cf0c169c19c8 --- /dev/null +++ b/execution/strategies/cli_parser.py @@ -0,0 +1,88 @@ +import argparse + +def default_parser(description): + """ + Returns the default parser that can be used for thodolite and run uc py + :param description: The description the argument parser should show. + """ + parser = argparse.ArgumentParser(description=description) + parser.add_argument('--uc', + metavar='<uc>', + help='[mandatory] use case number, one of 1, 2, 3 or 4') + parser.add_argument('--partitions', '-p', + default=40, + type=int, + metavar='<partitions>', + help='Number of partitions for Kafka topics') + parser.add_argument('--cpu-limit', '-cpu', + default='1000m', + metavar='<CPU limit>', + help='Kubernetes CPU limit') + parser.add_argument('--memory-limit', '-mem', + default='4Gi', + metavar='<memory limit>', + help='Kubernetes memory limit') + parser.add_argument('--commit-ms', + default=100, + type=int, + metavar='<commit ms>', + help='Kafka Streams commit interval in milliseconds') + parser.add_argument('--duration', '-d', + default=5, + type=int, + metavar='<duration>', + help='Duration in minutes subexperiments should be \ + executed for') + parser.add_argument('--reset', + action="store_true", + help='Resets the environment before execution') + parser.add_argument('--reset-only', + action="store_true", + help='Only resets the environment. Ignores all other parameters') + return parser + +def benchmark_parser(description): + """ + Parser for the overall benchmark execution + :param description: The description the argument parser should show. + """ + parser = default_parser(description) + + parser.add_argument('--loads', + type=int, + metavar='<load>', + nargs='+', + help='[mandatory] Loads that should be executed') + parser.add_argument('--instances', '-i', + dest='instances_list', + type=int, + metavar='<instances>', + nargs='+', + help='[mandatory] List of instances used in benchmarks') + parser.add_argument('--domain-restriction', + action="store_true", + help='To use domain restriction. For details see README') + parser.add_argument('--search-strategy', + default='default', + metavar='<strategy>', + help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') + return parser + +def execution_parser(description): + """ + Parser for executing one use case + :param description: The description the argument parser should show. + """ + parser = default_parser(description) + parser.add_argument('--exp-id', + metavar='<exp id>', + help='[mandatory] ID of the experiment') + parser.add_argument('--load', + type=int, + metavar='<load>', + help='[mandatory] Load that should be used for benchmakr') + parser.add_argument('--instances', + type=int, + metavar='<instances>', + help='[mandatory] Numbers of instances to be benchmarked') + return parser diff --git a/execution/theodolite.py b/execution/theodolite.py index 741d7e2ea74d6694c003b114dbc5f703c3d056d6..b76e0b2da01aef318fe8c5a5a1265276051f301d 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -1,8 +1,11 @@ #!/usr/bin/env python -import sys +import argparse +import logging # logging import os +import sys from strategies.config import ExperimentConfig +from strategies.cli_parser import benchmark_parser import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy import strategies.strategies.search.check_all_strategy as check_all_strategy @@ -12,150 +15,164 @@ from strategies.experiment_execution import ExperimentExecutor import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator -uc=sys.argv[1] -dim_values=sys.argv[2].split(',') -replicas=sys.argv[3].split(',') -partitions=sys.argv[4] if len(sys.argv) >= 5 and sys.argv[4] else 40 -cpu_limit=sys.argv[5] if len(sys.argv) >= 6 and sys.argv[5] else "1000m" -memory_limit=sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else "4Gi" -kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[7] else 100 -execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5 -domain_restriction=bool(sys.argv[9]) if len(sys.argv) >= 10 and sys.argv[9] == "restrict-domain" else False -search_strategy=sys.argv[10] if len(sys.argv) >= 11 and (sys.argv[10] == "linear-search" or sys.argv[10] == "binary-search") else "check-all" -print(f"Domain restriction of search space activated: {domain_restriction}") -print(f"Chosen search strategy: {search_strategy}") +def load_variables(): + """Load the CLI variables given at the command line""" + print('Load CLI variables') + parser = benchmark_parser("Run theodolite benchmarking") + args = parser.parse_args() + print(args) + if args.uc is None or args.loads is None or args.instances_list is None: + print('The options --uc, --loads and --instances are mandatory.') + print('Some might not be set!') + sys.exit(1) + return args -if os.path.exists("exp_counter.txt"): - with open("exp_counter.txt", mode="r") as read_stream: - exp_id = int(read_stream.read()) -else: - exp_id = 0 -with open("exp_counter.txt", mode="w") as write_stream: - write_stream.write(str(exp_id+1)) +def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, commit_ms, duration, domain_restriction, search_strategy, reset, reset_only): -# Store metadata -separator = "," -lines = [ - f"UC={uc}\n", - f"DIM_VALUES={separator.join(dim_values)}\n", - f"REPLICAS={separator.join(replicas)}\n", - f"PARTITIONS={partitions}\n", - f"CPU_LIMIT={cpu_limit}\n", - f"MEMORY_LIMIT={memory_limit}\n", - f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={kafka_streams_commit_interval_ms}\n", - f"EXECUTION_MINUTES={execution_minutes}\n", - f"DOMAIN_RESTRICTION={domain_restriction}\n", - f"SEARCH_STRATEGY={search_strategy}" - ] -with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream: - stream.writelines(lines) + print(f"Domain restriction of search space activated: {domain_restriction}") + print(f"Chosen search strategy: {search_strategy}") -# domain restriction -if domain_restriction: - # domain restriction + linear-search - if search_strategy == "linear-search": - print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # domain restriction + binary-search - elif search_strategy == "binary-search": - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # domain restriction + check-all + if os.path.exists("exp_counter.txt"): + with open("exp_counter.txt", mode="r") as read_stream: + exp_id = int(read_stream.read()) else: - print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) -# no domain restriction -else: - # no domain restriction + linear-search - if search_strategy == "linear-search": - print(f"Going to execute at most {len(dim_values)*len(replicas)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # no domain restriction + binary-search - elif search_strategy == "binary-search": - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - # no domain restriction + check-all + exp_id = 0 + + # Store metadata + separator = "," + lines = [ + f"UC={uc}\n", + f"DIM_VALUES={separator.join(loads)}\n", + f"REPLICAS={separator.join(instances_list)}\n", + f"PARTITIONS={partitions}\n", + f"CPU_LIMIT={cpu_limit}\n", + f"MEMORY_LIMIT={memory_limit}\n", + f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n", + f"EXECUTION_MINUTES={duration}\n", + f"DOMAIN_RESTRICTION={domain_restriction}\n", + f"SEARCH_STRATEGY={search_strategy}" + ] + with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream: + stream.writelines(lines) + + with open("exp_counter.txt", mode="w") as write_stream: + write_stream.write(str(exp_id + 1)) + + # domain restriction + if domain_restriction: + # domain restriction + linear-search + if search_strategy == "linear-search": + print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # domain restriction + binary-search + elif search_strategy == "binary-search": + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # domain restriction + check_all + else: + print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction else: - print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..") - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=dim_values, - replicass=replicas, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + linear-search + if search_strategy == "linear-search": + print(f"Going to execute at most {len(loads)*len(instances_list)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=linear_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + binary-search + elif search_strategy == "binary-search": + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=binary_search_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + # no domain restriction + check_all + else: + print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") + experiment_config = ExperimentConfig( + use_case=uc, + exp_id=exp_id, + dim_values=loads, + replicass=instances_list, + partitions=partitions, + cpu_limit=cpu_limit, + memory_limit=memory_limit, + kafka_streams_commit_interval_ms=commit_ms, + execution_minutes=duration, + domain_restriction_strategy=no_lower_bound_strategy, + search_strategy=check_all_strategy, + subexperiment_executor=subexperiment_executor, + subexperiment_evaluator=subexperiment_evaluator) + + executor = ExperimentExecutor(experiment_config) + executor.execute() + -executor = ExperimentExecutor(experiment_config) -executor.execute() \ No newline at end of file +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + args = load_variables() + main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, + args.memory_limit, args.commit_ms, args.duration, + args.domain_restriction, args.search_strategy, args.reset, + args.reset_only)