diff --git a/execution/.dockerignore b/execution/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..68e5f21c503a80d7db64722d700351a303ddb9dd --- /dev/null +++ b/execution/.dockerignore @@ -0,0 +1,9 @@ +* +!requirements.txt +!uc-workload-generator +!uc-application +!strategies +!lib +!theodolite.py +!run_uc.py +!lag_analysis.py diff --git a/execution/Dockerfile b/execution/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..e71bc91d9d31bea4c1598292e43d0ab7c193c3fa --- /dev/null +++ b/execution/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.8 + +RUN mkdir /app +WORKDIR /app +ADD requirements.txt /app/ +RUN pip install -r requirements.txt +COPY uc-workload-generator /app/uc-workload-generator +COPY uc-application /app/uc-application +COPY strategies /app/strategies +COPY lib /app/lib +COPY lag_analysis.py /app/ +COPY run_uc.py /app/ +COPY theodolite.py /app/ + +CMD ["python", "/app/theodolite.py"] diff --git a/execution/strategies/cli_parser.py b/execution/lib/cli_parser.py similarity index 74% rename from execution/strategies/cli_parser.py rename to execution/lib/cli_parser.py index 0b16b6ba5b5c569ed48b319ce068cf0c169c19c8..cb4abcabaecf08dd4c7edadd8f1697838dab6970 100644 --- a/execution/strategies/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -1,4 +1,14 @@ import argparse +import os + +def env_list_default(env, tf): + """ + Makes a list from an environment string. + """ + v = os.environ.get(env) + if v is not None: + v = [tf(s) for s in v.split(',')] + return v def default_parser(description): """ @@ -8,29 +18,30 @@ def default_parser(description): parser = argparse.ArgumentParser(description=description) parser.add_argument('--uc', metavar='<uc>', + default=os.environ.get('UC'), help='[mandatory] use case number, one of 1, 2, 3 or 4') parser.add_argument('--partitions', '-p', - default=40, - type=int, metavar='<partitions>', + type=int, + default=os.environ.get('PARTITIONS', 40), help='Number of partitions for Kafka topics') parser.add_argument('--cpu-limit', '-cpu', - default='1000m', metavar='<CPU limit>', + default=os.environ.get('CPU_LIMIT', '1000m'), help='Kubernetes CPU limit') parser.add_argument('--memory-limit', '-mem', - default='4Gi', metavar='<memory limit>', + default=os.environ.get('MEMORY_LIMIT', '4Gi'), help='Kubernetes memory limit') parser.add_argument('--commit-ms', - default=100, - type=int, metavar='<commit ms>', + type=int, + default=os.environ.get('COMMIT_MS', 100), help='Kafka Streams commit interval in milliseconds') parser.add_argument('--duration', '-d', - default=5, - type=int, metavar='<duration>', + type=int, + default=os.environ.get('DURATION', 5), help='Duration in minutes subexperiments should be \ executed for') parser.add_argument('--reset', @@ -39,6 +50,10 @@ def default_parser(description): parser.add_argument('--reset-only', action="store_true", help='Only resets the environment. Ignores all other parameters') + parser.add_argument('--prometheus', + metavar='<URL>', + default=os.environ.get('PROMETHEUS_BASE_URL'), + help='Defines where to find the prometheus instance') return parser def benchmark_parser(description): @@ -49,22 +64,24 @@ def benchmark_parser(description): parser = default_parser(description) parser.add_argument('--loads', - type=int, metavar='<load>', + type=int, nargs='+', + default=env_list_default('LOADS', int), help='[mandatory] Loads that should be executed') parser.add_argument('--instances', '-i', dest='instances_list', - type=int, metavar='<instances>', + type=int, nargs='+', + default=env_list_default('INSTANCES', int), help='[mandatory] List of instances used in benchmarks') parser.add_argument('--domain-restriction', action="store_true", help='To use domain restriction. For details see README') parser.add_argument('--search-strategy', - default='default', metavar='<strategy>', + default=os.environ.get('SEARCH_STRATEGY', 'default'), help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') return parser @@ -76,13 +93,16 @@ def execution_parser(description): parser = default_parser(description) parser.add_argument('--exp-id', metavar='<exp id>', + default=os.environ.get('EXP_ID'), help='[mandatory] ID of the experiment') parser.add_argument('--load', - type=int, metavar='<load>', + type=int, + default=os.environ.get('LOAD'), help='[mandatory] Load that should be used for benchmakr') parser.add_argument('--instances', - type=int, metavar='<instances>', + type=int, + default=os.environ.get('INSTANCES'), help='[mandatory] Numbers of instances to be benchmarked') return parser diff --git a/execution/run_uc.py b/execution/run_uc.py index 3a21e23e6971fd0df6a19dd2a9d32cbf83e9af9b..8cd6f0621b089f0384b8173d01de7748cb29b423 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -4,8 +4,8 @@ from kubernetes import client, config # kubernetes api from kubernetes.stream import stream import lag_analysis import logging # logging -from os import path # path utilities -from strategies.cli_parser import execution_parser +from os import path, environ # path utilities +from lib.cli_parser import execution_parser import subprocess # execute bash commands import sys # for exit of program import time # process sleep @@ -244,7 +244,7 @@ def wait_execution(execution_minutes): return -def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes): +def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None): """ Runs the evaluation function :param string exp_id: ID of the experiment. @@ -254,7 +254,12 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes): :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) + if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None: + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) + elif prometheus_base_url is not None: + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url) + else: + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL')) return @@ -370,22 +375,14 @@ def reset_zookeeper(): print('Delete ZooKeeper configurations used for workload generation') delete_zoo_data_command = [ - 'kubectl', - 'exec', - 'zookeeper-client', - '--', - 'bash', + '/bin/sh', '-c', 'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall ' + '/workload-generation' ] check_zoo_data_command = [ - 'kubectl', - 'exec', - 'zookeeper-client', - '--', - 'bash', + '/bin/sh', '-c', 'zookeeper-shell my-confluent-cp-zookeeper:2181 get ' + '/workload-generation' @@ -394,18 +391,25 @@ def reset_zookeeper(): # Wait for configuration deletion while True: # Delete Zookeeper configuration data - output = subprocess.run(delete_zoo_data_command, - capture_output=True, - text=True) - logging.debug(output.stdout) + resp = stream(coreApi.connect_get_namespaced_pod_exec, + "zookeeper-client", + 'default', + command=delete_zoo_data_command, + stderr=True, stdin=False, + stdout=True, tty=False) + logging.debug(resp) # Check data is deleted - output = subprocess.run(check_zoo_data_command, - capture_output=True, - text=True) - logging.debug(output) + client = stream(coreApi.connect_get_namespaced_pod_exec, + "zookeeper-client", + 'default', + command=check_zoo_data_command, + stderr=True, stdin=False, + stdout=True, tty=False, + _preload_content=False) # Get client for returncode + client.run_forever(timeout=60) # Start the client - if output.returncode == 1: # Means data not available anymore + if client.returncode == 1: # Means data not available anymore print('ZooKeeper reset was successful.') break else: @@ -450,7 +454,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): stop_lag_exporter() -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only): +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url=None, reset=False, reset_only=False): """ Main method to execute one time the benchmark for a given use case. Start workload generator/application -> execute -> analyse -> stop all @@ -515,7 +519,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi wait_execution(execution_minutes) print('---------------------') - run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes) + run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url) print('---------------------') # Reset cluster regular, therefore abort exit not needed anymore @@ -529,5 +533,5 @@ if __name__ == '__main__': print('---------------------') main(args.exp_id, args.uc, args.load, args.instances, args.partitions, args.cpu_limit, args.memory_limit, - args.commit_ms, args.duration, args.reset, + args.commit_ms, args.duration, args.prometheus, args.reset, args.reset_only) diff --git a/execution/theodolite.py b/execution/theodolite.py index b76e0b2da01aef318fe8c5a5a1265276051f301d..2c45bc06740641139c6f72eb142cfd195f96abd0 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -1,11 +1,11 @@ #!/usr/bin/env python import argparse +from lib.cli_parser import benchmark_parser import logging # logging import os import sys from strategies.config import ExperimentConfig -from strategies.cli_parser import benchmark_parser import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy import strategies.strategies.search.check_all_strategy as check_all_strategy diff --git a/execution/theodolite.yaml b/execution/theodolite.yaml new file mode 100644 index 0000000000000000000000000000000000000000..d5e821469aca5a394081a139f2ebadb703b4aa98 --- /dev/null +++ b/execution/theodolite.yaml @@ -0,0 +1,67 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: theodolite +spec: + template: + spec: + containers: + - name: theodolite + image: bvonheid/theodolite:latest + # imagePullPolicy: Never # Used to pull "own" local image + env: + - name: UC + value: "1" + - name: LOADS + value: "13206, 19635" + - name: INSTANCES + value: "1, 2" + - name: DURATION + value: "3" + - name: PARTITIONS + value: "30" + # - name: COMMIT_MS + # value: "" + # - name: SEARCH_STRATEGY + # value: "" + # - name: CPU_LIMIT + # value: "" + # - name: MEMORY_LIMIT + # value: "" + - name: PROMETHEUS_BASE_URL + value: "http://prometheus-operated:9090" + - name: PYTHONUNBUFFERED + value: "1" + restartPolicy: Never + backoffLimit: 4 + +# --- +# apiVersion: v1 +# kind: ServiceAccount +# metadata: +# name: theodolite +# --- +# apiVersion: rbac.authorization.k8s.io/v1 +# kind: Role +# metadata: +# name: modify-pods +# rules: +# - apiGroups: [""] +# resources: +# - pods +# verbs: +# - get +# - list +# - delete +# --- +# apiVersion: rbac.authorization.k8s.io/v1 +# kind: RoleBinding +# metadata: +# name: modify-pods-to-sa +# subjects: +# - kind: ServiceAccount +# name: theodolite +# roleRef: +# kind: Role +# name: modify-pods +# apiGroup: rbac.authorization.k8s.io