diff --git a/execution/Dockerfile b/execution/Dockerfile deleted file mode 100644 index e71bc91d9d31bea4c1598292e43d0ab7c193c3fa..0000000000000000000000000000000000000000 --- a/execution/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM python:3.8 - -RUN mkdir /app -WORKDIR /app -ADD requirements.txt /app/ -RUN pip install -r requirements.txt -COPY uc-workload-generator /app/uc-workload-generator -COPY uc-application /app/uc-application -COPY strategies /app/strategies -COPY lib /app/lib -COPY lag_analysis.py /app/ -COPY run_uc.py /app/ -COPY theodolite.py /app/ - -CMD ["python", "/app/theodolite.py"] diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py deleted file mode 100644 index 5b78ef3653753a2b95ac9b74bf8de156a71fb14c..0000000000000000000000000000000000000000 --- a/execution/lag_analysis.py +++ /dev/null @@ -1,167 +0,0 @@ -import sys -import os -import requests -from datetime import datetime, timedelta, timezone -import pandas as pd -import matplotlib.pyplot as plt -import csv -import logging - - -def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path): - print("Main") - time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) - - now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) - now = now_local - timedelta(milliseconds=time_diff_ms) - print(f"Now Local: {now_local}") - print(f"Now Used: {now}") - - end = now - start = now - timedelta(minutes=execution_minutes) - - #print(start.isoformat().replace('+00:00', 'Z')) - #print(end.isoformat().replace('+00:00', 'Z')) - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", - 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - # response - # print(response.request.path_url) - # response.content - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - topic = result['metric']['topic'] - for value in result['values']: - # print(value) - d.append({'topic': topic, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - # Do some analysis - - input = df.loc[df['topic'] == "input"] - - # input.plot(kind='line',x='timestamp',y='value',color='red') - # plt.show() - - from sklearn.linear_model import LinearRegression - - # values converts it into a numpy array - X = input.iloc[:, 1].values.reshape(-1, 1) - # -1 means that calculate the dimension of rows, but have 1 column - Y = input.iloc[:, 2].values.reshape(-1, 1) - linear_regressor = LinearRegression() # create object for the class - linear_regressor.fit(X, Y) # perform linear regression - Y_pred = linear_regressor.predict(X) # make predictions - - print(linear_regressor.coef_) - - # print(Y_pred) - - fields = [exp_id, datetime.now(), benchmark, dim_value, - instances, linear_regressor.coef_] - print(fields) - with open(f'{result_path}/results.csv', 'a') as f: - writer = csv.writer(f) - writer.writerow(fields) - - filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}" - - plt.plot(X, Y) - plt.plot(X, Y_pred, color='red') - - plt.savefig(f"{filename}_plot.png") - - df.to_csv(f"{filename}_values.csv") - - # Load total lag count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - group = result['metric']['group'] - for value in result['values']: - # print(value) - d.append({'group': group, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_totallag.csv") - - # Load partition count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - topic = result['metric']['topic'] - for value in result['values']: - # print(value) - d.append({'topic': topic, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_partitions.csv") - - # Load instances count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - for value in result['values']: - # print(value) - d.append({'timestamp': int(value[0]), 'value': int(value[1])}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_instances.csv") - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - - # Load arguments - exp_id = sys.argv[1] - benchmark = sys.argv[2] - dim_value = sys.argv[3] - instances = sys.argv[4] - execution_minutes = int(sys.argv[5]) - - main(exp_id, benchmark, dim_value, instances, execution_minutes, - 'http://localhost:9090', 'results') diff --git a/execution/lib/__init__.py b/execution/lib/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py deleted file mode 100644 index de609bc55e21e9467a2b28168be6e478171cfddd..0000000000000000000000000000000000000000 --- a/execution/lib/cli_parser.py +++ /dev/null @@ -1,167 +0,0 @@ -import argparse -import os - - -def env_list_default(env, tf): - """ - Makes a list from an environment string. - """ - v = os.environ.get(env) - if v is not None: - v = [tf(s) for s in v.split(',')] - return v - - -def key_values_to_dict(kvs): - """ - Given a list with key values in form `Key=Value` it creates a dict from it. - """ - my_dict = {} - for kv in kvs: - k, v = kv.split("=") - my_dict[k] = v - return my_dict - - -def env_dict_default(env): - """ - Makes a dict from an environment string. - """ - v = os.environ.get(env) - if v is not None: - return key_values_to_dict(v.split(',')) - else: - return dict() - - -class StoreDictKeyPair(argparse.Action): - def __init__(self, option_strings, dest, nargs=None, **kwargs): - self._nargs = nargs - super(StoreDictKeyPair, self).__init__( - option_strings, dest, nargs=nargs, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - my_dict = key_values_to_dict(values) - setattr(namespace, self.dest, my_dict) - - -def default_parser(description): - """ - Returns the default parser that can be used for thodolite and run uc py - :param description: The description the argument parser should show. - """ - parser = argparse.ArgumentParser(description=description) - parser.add_argument('--uc', - metavar='<uc>', - default=os.environ.get('UC'), - help='[mandatory] use case number, one of 1, 2, 3 or 4') - parser.add_argument('--partitions', '-p', - metavar='<partitions>', - type=int, - default=os.environ.get('PARTITIONS', 40), - help='Number of partitions for Kafka topics') - parser.add_argument('--cpu-limit', '-cpu', - metavar='<CPU limit>', - default=os.environ.get('CPU_LIMIT', '1000m'), - help='Kubernetes CPU limit') - parser.add_argument('--memory-limit', '-mem', - metavar='<memory limit>', - default=os.environ.get('MEMORY_LIMIT', '4Gi'), - help='Kubernetes memory limit') - parser.add_argument('--duration', '-d', - metavar='<duration>', - type=int, - default=os.environ.get('DURATION', 5), - help='Duration in minutes subexperiments should be \ - executed for') - parser.add_argument('--namespace', - metavar='<NS>', - default=os.environ.get('NAMESPACE', 'default'), - help='Defines the Kubernetes where the applications should run') - parser.add_argument('--reset', - action="store_true", - default=os.environ.get( - 'RESET', 'false').lower() == 'true', - help='Resets the environment before execution') - parser.add_argument('--reset-only', - action="store_true", - default=os.environ.get( - 'RESET_ONLY', 'false').lower() == 'true', - help='Only resets the environment. Ignores all other parameters') - parser.add_argument('--prometheus', - metavar='<URL>', - default=os.environ.get( - 'PROMETHEUS_BASE_URL', 'http://localhost:9090'), - help='Defines where to find the prometheus instance') - parser.add_argument('--path', - metavar='<path>', - default=os.environ.get('RESULT_PATH', 'results'), - help='A directory path for the results') - parser.add_argument("--configurations", - metavar="KEY=VAL", - dest="configurations", - action=StoreDictKeyPair, - nargs="+", - default=env_dict_default('CONFIGURATIONS'), - help='Defines the environment variables for the UC') - return parser - - -def benchmark_parser(description): - """ - Parser for the overall benchmark execution - :param description: The description the argument parser should show. - """ - parser = default_parser(description) - - parser.add_argument('--loads', - metavar='<load>', - type=int, - nargs='+', - default=env_list_default('LOADS', int), - help='[mandatory] Loads that should be executed') - parser.add_argument('--instances', '-i', - dest='instances_list', - metavar='<instances>', - type=int, - nargs='+', - default=env_list_default('INSTANCES', int), - help='[mandatory] List of instances used in benchmarks') - parser.add_argument('--domain-restriction', - action="store_true", - default=os.environ.get( - 'DOMAIN_RESTRICTION', 'false').lower() == 'true', - help='To use domain restriction. For details see README') - parser.add_argument('--search-strategy', - metavar='<strategy>', - default=os.environ.get('SEARCH_STRATEGY', 'default'), - help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') - parser.add_argument('--threshold', - type=int, - metavar='<threshold>', - default=os.environ.get('THRESHOLD', 2000), - help='The threshold for the trend slop that the search strategies use to determine that a load could be handled') - return parser - - -def execution_parser(description): - """ - Parser for executing one use case - :param description: The description the argument parser should show. - """ - parser = default_parser(description) - parser.add_argument('--exp-id', - metavar='<exp id>', - default=os.environ.get('EXP_ID'), - help='[mandatory] ID of the experiment') - parser.add_argument('--load', - metavar='<load>', - type=int, - default=os.environ.get('LOAD'), - help='[mandatory] Load that should be used for benchmakr') - parser.add_argument('--instances', - metavar='<instances>', - type=int, - default=os.environ.get('INSTANCES'), - help='[mandatory] Numbers of instances to be benchmarked') - return parser diff --git a/execution/lib/trend_slope_computer.py b/execution/lib/trend_slope_computer.py deleted file mode 100644 index 90ae26cfd275f53307e19532f047e5e0a9326d3a..0000000000000000000000000000000000000000 --- a/execution/lib/trend_slope_computer.py +++ /dev/null @@ -1,19 +0,0 @@ -from sklearn.linear_model import LinearRegression -import pandas as pd -import os - -def compute(directory, filename, warmup_sec): - df = pd.read_csv(os.path.join(directory, filename)) - input = df - input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] - regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up - - X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array - Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column - linear_regressor = LinearRegression() # create object for the class - linear_regressor.fit(X, Y) # perform linear regression - Y_pred = linear_regressor.predict(X) # make predictions - - trend_slope = linear_regressor.coef_[0][0] - - return trend_slope diff --git a/execution/requirements.txt b/execution/requirements.txt deleted file mode 100644 index 18a06882007eebf69bf3bf4f84b869454b36a0a6..0000000000000000000000000000000000000000 --- a/execution/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -matplotlib==3.2.0 -pandas==1.0.1 -requests==2.23.0 -scikit-learn==0.22.2.post1 - -# For run_uc.py -kubernetes==11.0.0 -confuse==1.1.0 diff --git a/execution/run_uc.py b/execution/run_uc.py deleted file mode 100644 index 904b87b377ca2db3f2d4ddd4fb70aba0136cfa21..0000000000000000000000000000000000000000 --- a/execution/run_uc.py +++ /dev/null @@ -1,609 +0,0 @@ -import argparse # parse arguments from cli -import atexit # used to clear resources at exit of program (e.g. ctrl-c) -from kubernetes import client, config # kubernetes api -from kubernetes.stream import stream -import lag_analysis -import logging # logging -from os import path, environ # path utilities -from lib.cli_parser import execution_parser -import subprocess # execute bash commands -import sys # for exit of program -import time # process sleep -import yaml # convert from file to yaml object - -coreApi = None # acces kubernetes core api -appsApi = None # acces kubernetes apps api -customApi = None # acces kubernetes custom object api - -namespace = None - - -def load_variables(): - """Load the CLI variables given at the command line""" - print('Load CLI variables') - parser = execution_parser(description='Run use case Programm') - args = parser.parse_args() - print(args) - if (args.exp_id is None or args.uc is None or args.load is None or args.instances is None) and not args.reset_only: - print('The options --exp-id, --uc, --load and --instances are mandatory.') - print('Some might not be set!') - sys.exit(1) - return args - - -def initialize_kubernetes_api(): - """Load the kubernetes config from local or the cluster and creates - needed APIs. - """ - global coreApi, appsApi, customApi - print('Connect to kubernetes api') - try: - config.load_kube_config() # try using local config - except config.config_exception.ConfigException as e: - # load config from pod, if local config is not available - logging.debug( - 'Failed loading local Kubernetes configuration try from cluster') - logging.debug(e) - config.load_incluster_config() - - coreApi = client.CoreV1Api() - appsApi = client.AppsV1Api() - customApi = client.CustomObjectsApi() - - -def create_topics(topics): - """Create the topics needed for the use cases - :param topics: List of topics that should be created. - """ - # Calling exec and waiting for response - print('Create topics') - for (topic, partitions) in topics: - print(f'Create topic {topic} with #{partitions} partitions') - exec_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\ - --create --topic {topic} --partitions {partitions}\ - --replication-factor 1' - ] - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=exec_command, - stderr=True, stdin=False, - stdout=True, tty=False) - print(resp) - - -def load_yaml(file_path): - """Creates a yaml file from the file at given path. - :param file_path: The path to the file which contains the yaml. - :return: The file as a yaml object. - """ - try: - f = open(path.join(path.dirname(__file__), file_path)) - with f: - return yaml.safe_load(f) - except Exception as e: - logging.error('Error opening file %s', file_path) - logging.error(e) - - -def load_yaml_files(): - """Load the needed yaml files and creates objects from them. - :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy - """ - print('Load kubernetes yaml files') - wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml') - wg = load_yaml('uc-workload-generator/workloadGenerator.yaml') - app_svc = load_yaml('uc-application/aggregation-service.yaml') - app_svc_monitor = load_yaml('uc-application/service-monitor.yaml') - app_jmx = load_yaml('uc-application/jmx-configmap.yaml') - app_deploy = load_yaml('uc-application/aggregation-deployment.yaml') - - print('Kubernetes yaml files loaded') - return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy - - -def replace_env_value(container, key, value): - """ - Special method to replace in a container with kubernetes env values - the value of a given parameter. - """ - next(filter(lambda x: x['name'] == key, container))[ - 'value'] = value - - -def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id): - """Starts the workload generator. - :param wg_yaml: The yaml object for the workload generator service. - :param wg_yaml: The yaml object for the workload generator. - :param string dim_value: The dimension value the load generator should use. - :param string uc_id: Use case id for which load should be generated. - :return: - The StatefulSet created by the API or in case it already exist/error - the yaml object. - """ - print('Start workload generator') - svc, wg_deploy = None, None - - # Create Service - try: - svc = coreApi.create_namespaced_service( - namespace=namespace, body=svc_yaml) - print(f'Service {svc.metadata.name} created.') - except client.rest.ApiException as e: - svc = svc_yaml - logging.error("Service creation error: %s", e.reason) - - # Create Deployment - num_sensors = dim_value - wl_max_records = 150000 - wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records - - # set parameters special for uc 4 - if uc_id == '4': - print('use uc4 stuff') - num_nested_groups = dim_value - num_sensors = 4 - approx_num_sensors = num_sensors ** num_nested_groups - wl_instances = (approx_num_sensors + - wl_max_records - 1) // wl_max_records - - # Customize workload generator creations - wg_yaml['spec']['replicas'] = wl_instances - # Set used use case - wg_containter = next(filter( - lambda x: x['name'] == 'workload-generator', wg_yaml['spec']['template']['spec']['containers'])) - wg_containter['image'] = 'ghcr.io/cau-se/theodolite-uc' + uc_id + \ - '-workload-generator:latest' - # Set environment variables - - replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) - - if uc_id == '4': # Special configuration for UC4 - replace_env_value( - wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) - - try: - wg_deploy = appsApi.create_namespaced_deployment( - namespace=namespace, - body=wg_yaml - ) - print(f'Deployment {wg_deploy.metadata.name} created.') - except client.rest.ApiException as e: - print(f'Deployment creation error: {e.reason}') - wg_deploy = wg_yaml - - return svc, wg_deploy - - -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, - instances, uc_id, memory_limit, cpu_limit, - configurations): - """Applies the service, service monitor, jmx config map and start the - use case application. - - :param svc_yaml: The yaml object for the service. - :param svc_monitor_yaml: The yaml object for the service monitor. - :param jmx_yaml: The yaml object for the jmx config map. - :param deploy_yaml: The yaml object for the application. - :param int instances: Number of instances for use case application. - :param string uc_id: The id of the use case to execute. - :param string memory_limit: The memory limit for the application. - :param string cpu_limit: The CPU limit for the application. - :param dict configurations: A dictionary with ENV variables for configurations. - :return: - The Service, ServiceMonitor, JMX ConfigMap and Deployment. - In case the resource already exist/error the yaml object is returned. - return svc, svc_monitor, jmx_cm, app_deploy - """ - print('Start use case application') - svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None - - # Create Service - try: - svc = coreApi.create_namespaced_service( - namespace=namespace, body=svc_yaml) - print(f'Service {svc.metadata.name} created.') - except client.rest.ApiException as e: - svc = svc_yaml - logging.error("Service creation error: %s", e.reason) - - # Create custom object service monitor - try: - svc_monitor = customApi.create_namespaced_custom_object( - group="monitoring.coreos.com", - version="v1", - namespace=namespace, - plural="servicemonitors", # CustomResourceDef of ServiceMonitor - body=svc_monitor_yaml, - ) - print(f"ServiceMonitor '{svc_monitor['metadata']['name']}' created.") - except client.rest.ApiException as e: - svc_monitor = svc_monitor_yaml - logging.error("ServiceMonitor creation error: %s", e.reason) - - # Apply jmx config map for aggregation service - try: - jmx_cm = coreApi.create_namespaced_config_map( - namespace=namespace, body=jmx_yaml) - print(f"ConfigMap '{jmx_cm.metadata.name}' created.") - except client.rest.ApiException as e: - jmx_cm = jmx_yaml - logging.error("ConfigMap creation error: %s", e.reason) - - # Create deployment - deploy_yaml['spec']['replicas'] = instances - app_container = next(filter( - lambda x: x['name'] == 'uc-application', - deploy_yaml['spec']['template']['spec']['containers'])) - app_container['image'] = 'ghcr.io/cau-se/theodolite-uc' + uc_id \ - + '-kstreams-app:latest' - - # Set configurations environment parameters for SPE - for k, v in configurations.items(): - # check if environment variable is already definde in yaml - env = next(filter(lambda x: x['name'] == k, - app_container['env']), None) - if env is not None: - env['value'] = v # replace value - else: - # create new environment pair - conf = {'name': k, 'value': v} - app_container['env'].append(conf) - - # Set resources in Kubernetes - app_container['resources']['limits']['memory'] = memory_limit - app_container['resources']['limits']['cpu'] = cpu_limit - - # Deploy application - try: - app_deploy = appsApi.create_namespaced_deployment( - namespace=namespace, - body=deploy_yaml - ) - print(f"Deployment '{app_deploy.metadata.name}' created.") - except client.rest.ApiException as e: - app_deploy = deploy_yaml - logging.error("Deployment creation error: %s", e.reason) - - return svc, svc_monitor, jmx_cm, app_deploy - - -def wait_execution(execution_minutes): - """ - Wait time while in execution. - :param int execution_minutes: The duration to wait for execution. - """ - print('Wait while executing') - - for i in range(execution_minutes): - time.sleep(60) - print(f'Executed: {i+1} minutes') - print('Execution finished') - return - - -def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path): - """ - Runs the evaluation function - :param string exp_id: ID of the experiment. - :param string uc_id: ID of the executed use case. - :param int dim_value: The dimension value used for execution. - :param int instances: The number of instances used for the execution. - :param int execution_minutes: How long the use case where executed. - """ - print('Run evaluation function') - try: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, - execution_minutes, prometheus_base_url, - result_path) - except Exception as e: - err_msg = 'Evaluation function failed' - print(err_msg) - logging.exception(err_msg) - print('Benchmark execution continues') - - return - - -def delete_resource(obj, del_func): - """ - Helper function to delete kuberentes resources. - First tries to delete with the kuberentes object. - Then it uses the dict representation of yaml to delete the object. - :param obj: Either kubernetes resource object or yaml as a dict. - :param del_func: The function that need to be executed for deletion - """ - try: - del_func(obj.metadata.name, namespace) - except Exception as e: - logging.debug( - 'Error deleting resource with api object, try with dict.') - try: - del_func(obj['metadata']['name'], namespace) - except Exception as e: - logging.error("Error deleting resource") - logging.error(e) - return - print('Resource deleted') - - -def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy): - """Stops the applied applications and delete resources. - :param wg: The load generator service. - :param wg: The load generator deployment. - :param app_svc: The application service. - :param app_svc_monitor: The application service monitor. - :param app_jmx: The application jmx config map. - :param app_deploy: The application deployment. - """ - print('Stop use case application and load generator') - - print('Delete load generator deployment') - delete_resource(wg, appsApi.delete_namespaced_deployment) - - print('Delete load generator service') - delete_resource(wg_svc, coreApi.delete_namespaced_service) - - print('Delete app service') - delete_resource(app_svc, coreApi.delete_namespaced_service) - - print('Delete service monitor') - try: - customApi.delete_namespaced_custom_object( - group="monitoring.coreos.com", - version="v1", - namespace=namespace, - plural="servicemonitors", - name=app_svc_monitor['metadata']['name']) - print('Resource deleted') - except Exception as e: - print('Error deleting service monitor') - - print('Delete jmx config map') - delete_resource(app_jmx, coreApi.delete_namespaced_config_map) - - print('Delete uc application') - delete_resource(app_deploy, appsApi.delete_namespaced_deployment) - - print('Check all pods deleted.') - while True: - # Wait bit for deletion - time.sleep(2) - - # Count how many pod still need to be deleted - no_load = len(coreApi.list_namespaced_pod( - namespace, label_selector='app=titan-ccp-load-generator').items) - no_uc = len(coreApi.list_namespaced_pod( - namespace, label_selector='app=titan-ccp-aggregation').items) - - # Check if all pods deleted - if no_load <= 0 and no_uc <= 0: - print('All pods deleted.') - break - - print(f'#{no_load} load generator and #{no_uc} uc pods needs to be deleted') - return - - -def delete_topics(topics): - """Delete topics from Kafka. - :param topics: List of topics to delete. - """ - print('Delete topics from Kafka') - - topics_delete = 'theodolite-.*|' + '|'.join([ti[0] for ti in topics]) - - num_topics_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list \ - | sed -n -E "/^({topics_delete})\ - ( - marked for deletion)?$/p" | wc -l' - ] - - topics_deletion_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete \ - --topic "{topics_delete}"' - ] - - # Wait that topics get deleted - while True: - # topic deletion, sometimes a second deletion seems to be required - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=topics_deletion_command, - stderr=True, stdin=False, - stdout=True, tty=False) - print(resp) - - print('Wait for topic deletion') - time.sleep(2) - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=num_topics_command, - stderr=True, stdin=False, - stdout=True, tty=False) - if resp == '0': - print('Topics deleted') - break - return - - -def reset_zookeeper(): - """Delete ZooKeeper configurations used for workload generation. - """ - print('Delete ZooKeeper configurations used for workload generation') - - delete_zoo_data_command = [ - '/bin/sh', - '-c', - 'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall ' - + '/workload-generation' - ] - - check_zoo_data_command = [ - '/bin/sh', - '-c', - 'zookeeper-shell my-confluent-cp-zookeeper:2181 get ' - + '/workload-generation' - ] - - # Wait for configuration deletion - while True: - # Delete Zookeeper configuration data - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "zookeeper-client", - namespace, - command=delete_zoo_data_command, - stderr=True, stdin=False, - stdout=True, tty=False) - logging.debug(resp) - - # Check data is deleted - client = stream(coreApi.connect_get_namespaced_pod_exec, - "zookeeper-client", - namespace, - command=check_zoo_data_command, - stderr=True, stdin=False, - stdout=True, tty=False, - _preload_content=False) # Get client for returncode - client.run_forever(timeout=60) # Start the client - - if client.returncode == 1: # Means data not available anymore - print('ZooKeeper reset was successful.') - break - else: - print('ZooKeeper reset was not successful. Retrying in 5s.') - time.sleep(5) - return - - -def stop_lag_exporter(): - """ - Stop the lag exporter in order to reset it and allow smooth execution for - next use cases. - """ - print('Stop the lag exporter') - - try: - # Get lag exporter - pod_list = coreApi.list_namespaced_pod( - namespace=namespace, label_selector='app.kubernetes.io/name=kafka-lag-exporter') - lag_exporter_pod = pod_list.items[0].metadata.name - - # Delete lag exporter pod - res = coreApi.delete_namespaced_pod( - name=lag_exporter_pod, namespace=namespace) - except ApiException as e: - logging.error('Exception while stopping lag exporter') - logging.error(e) - - print('Deleted lag exporter pod: ' + lag_exporter_pod) - return - - -def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): - """ - Stop the applications, delete topics, reset zookeeper and stop lag exporter. - """ - print('Reset cluster') - stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy) - print('---------------------') - delete_topics(topics) - print('---------------------') - reset_zookeeper() - print('---------------------') - stop_lag_exporter() - - -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False): - """ - Main method to execute one time the benchmark for a given use case. - Start workload generator/application -> execute -> analyse -> stop all - :param string exp_id: The number of executed experiment - :param string uc_id: Use case to execute - :param int dim_value: Dimension value for load generator. - :param int instances: Number of instances for application. - :param int partitions: Number of partitions the kafka topics should have. - :param string cpu_limit: Max CPU utilazation for application. - :param string memory_limit: Max memory utilazation for application. - :param int execution_minutes: How long to execute the benchmark. - :param boolean reset: Flag for reset of cluster before execution. - :param dict configurations: Key value pairs for setting env variables of UC. - :param boolean reset_only: Flag to only reset the application. - """ - global namespace - namespace = ns - wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() - print('---------------------') - - initialize_kubernetes_api() - print('---------------------') - - topics = [('input', partitions), - ('output', partitions), - ('aggregation-feedback', partitions), - ('configuration', 1)] - - # Check for reset options - if reset_only: - # Only reset cluster an then end program - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, - app_jmx, app_deploy, topics) - sys.exit() - if reset: - # Reset cluster before execution - print('Reset only mode') - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, - app_jmx, app_deploy, topics) - print('---------------------') - - # Register the reset operation so that is executed at the abort of program - atexit.register(reset_cluster, wg_svc, wg, app_svc, - app_svc_monitor, app_jmx, app_deploy, topics) - - create_topics(topics) - print('---------------------') - - wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id) - print('---------------------') - - app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( - app_svc, - app_svc_monitor, - app_jmx, - app_deploy, - instances, - uc_id, - memory_limit, - cpu_limit, - configurations) - print('---------------------') - - wait_execution(execution_minutes) - print('---------------------') - - run_evaluation(exp_id, uc_id, dim_value, instances, - execution_minutes, prometheus_base_url, result_path) - print('---------------------') - - # Reset cluster regular, therefore abort exit not needed anymore - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) - atexit.unregister(reset_cluster) - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - args = load_variables() - print('---------------------') - main(args.exp_id, args.uc, args.load, args.instances, args.partitions, - args.cpu_limit, args.memory_limit, args.duration, args.prometheus, - args.reset, args.namespace, args.path, args.configurations, - args.reset_only) diff --git a/execution/strategies/__init__.py b/execution/strategies/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/strategies/config.py b/execution/strategies/config.py deleted file mode 100644 index d4df97c18ae54c7c181ddf08264c013f9447350f..0000000000000000000000000000000000000000 --- a/execution/strategies/config.py +++ /dev/null @@ -1,23 +0,0 @@ -from dataclasses import dataclass - -@dataclass -class ExperimentConfig: - """ Wrapper for the configuration of an experiment. """ - use_case: str - exp_id: int - dim_values: list - replicass: list - partitions: int - cpu_limit: str - memory_limit: str - execution_minutes: int - prometheus_base_url: str - reset: bool - namespace: str - result_path: str - configurations: dict - domain_restriction_strategy: object - search_strategy: object - threshold: int - subexperiment_executor: object - subexperiment_evaluator: object diff --git a/execution/strategies/experiment_execution.py b/execution/strategies/experiment_execution.py deleted file mode 100644 index c2ee18f9b79a6e880dbcb69b47061cc5ecc6b9ba..0000000000000000000000000000000000000000 --- a/execution/strategies/experiment_execution.py +++ /dev/null @@ -1,6 +0,0 @@ -class ExperimentExecutor: - def __init__(self, config): - self.config=config - - def execute(self): - self.config.domain_restriction_strategy.execute(self.config) diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py deleted file mode 100644 index 5c31f8c97a4085931cdfa1fa017d4e5909e21915..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/config.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import dataclass - -@dataclass -class SubexperimentConfig: - """ Wrapper for the configuration of a subexperiment """ - use_case: str - exp_id: int - counter: int - dim_value: int - replicas: int - partitions: int - cpu_limit: str - memory_limit: str - execution_minutes: int - prometheus_base_url: str - reset: bool - namespace: str - result_path: str - configurations: dict diff --git a/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py deleted file mode 100644 index b218731fc76d83347b4dbf10448f01615d378c0b..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py +++ /dev/null @@ -1,12 +0,0 @@ -# The lower bound strategy -def execute(config): - dim_value_index = 0 - lower_bound_replicas_index = 0 - subexperiment_counter = 0 - while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass): - lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute( - config=config, - dim_value_index=dim_value_index, - lower_replicas_bound_index=lower_bound_replicas_index, - subexperiment_counter=subexperiment_counter) - dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py deleted file mode 100644 index e5dea56118460b0dfdc6b1c36ce2587b6752512b..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py +++ /dev/null @@ -1,11 +0,0 @@ -# The strategy where the domain contains all amounts of instances -def execute(config): - dim_value_index = 0 - subexperiment_counter = 0 - while dim_value_index < len(config.dim_values): - _, subexperiment_counter = config.search_strategy.execute( - config=config, - dim_value_index=dim_value_index, - lower_replicas_bound_index=0, - subexperiment_counter=subexperiment_counter) - dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py deleted file mode 100644 index 46748cbda250597b3a7644522126268be4599293..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ /dev/null @@ -1,50 +0,0 @@ -# The binary search strategy -import os -from strategies.strategies.config import SubexperimentConfig - -def binary_search(config, dim_value, lower, upper, subexperiment_counter): - if lower == upper: - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # successful, the upper neighbor is assumed to also has been successful - return (lower, subexperiment_counter+1) - else: # not successful - return (lower+1, subexperiment_counter) - elif lower+1==upper: - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # minimal instances found - return (lower, subexperiment_counter) - else: # not successful, check if lower+1 instances are sufficient - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # minimal instances found - return (upper, subexperiment_counter) - else: - return (upper+1, subexperiment_counter) - else: - # test mid - mid=(upper+lower)//2 - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # success -> search in (lower, mid-1) - return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1) - else: # not success -> search in (mid+1, upper) - return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1) - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - upper = len(config.replicass)-1 - dim_value=config.dim_values[dim_value_index] - return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter) diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py deleted file mode 100644 index 0861945113b829fa79317d8a1a6312b4d6e4f71d..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ /dev/null @@ -1,31 +0,0 @@ -# The check_all strategy -import os -from strategies.strategies.config import SubexperimentConfig - - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - new_lower_replicas_bound_index = lower_replicas_bound_index - new_lower_replicas_bound_found = False - subexperiments_total = len(config.dim_values) * len(config.replicass) - while lower_replicas_bound_index < len(config.replicass): - subexperiment_counter += 1 - dim_value = config.dim_values[dim_value_index] - replicas = config.replicass[lower_replicas_bound_index] - print( - f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - - subexperiment_config = SubexperimentConfig( - config.use_case, config.exp_id, subexperiment_counter, dim_value, - replicas, config.partitions, config.cpu_limit, config.memory_limit, - config.execution_minutes, config.prometheus_base_url, config.reset, - config.namespace, config.result_path, config.configurations) - - config.subexperiment_executor.execute(subexperiment_config) - - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success and not new_lower_replicas_bound_found: - new_lower_replicas_bound_found = True - new_lower_replicas_bound_index = lower_replicas_bound_index - lower_replicas_bound_index += 1 - return (new_lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py deleted file mode 100644 index 8e777303742e54cf2a11a1bde60e95b8aa85489d..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ /dev/null @@ -1,23 +0,0 @@ -# The linear-search strategy - -import os -from strategies.strategies.config import SubexperimentConfig - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - subexperiments_total=len(config.dim_values)+len(config.replicass)-1 - dim_value=config.dim_values[dim_value_index] - while lower_replicas_bound_index < len(config.replicass): - subexperiment_counter+=1 - replicas=config.replicass[lower_replicas_bound_index] - print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: - return (lower_replicas_bound_index, subexperiment_counter) - else: - lower_replicas_bound_index+=1 - return (lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py deleted file mode 100644 index 30188de837746b76113ec635ca77fadc3a91cb92..0000000000000000000000000000000000000000 --- a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py +++ /dev/null @@ -1,29 +0,0 @@ -import lib.trend_slope_computer as trend_slope_computer -import logging -import os - -WARMUP_SEC = 60 - -def execute(config, threshold): - """ - Check the trend slope of the totallag of the subexperiment if it comes below - the threshold. - - :param config: Configuration of the subexperiment. - :param threshold: The threshold the trendslope need to come below. - """ - cwd = f'{os.getcwd()}/{config.result_path}' - file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" - - try: - trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC) - except Exception as e: - err_msg = 'Computing trend slope failed' - print(err_msg) - logging.exception(err_msg) - print('Mark this subexperiment as not successful and continue benchmark') - return False - - print(f"Trend Slope: {trend_slope}") - - return trend_slope < threshold diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py deleted file mode 100644 index 6931dacfc72081cbe112c4d6d1003703ba42c526..0000000000000000000000000000000000000000 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ /dev/null @@ -1,20 +0,0 @@ -# Wrapper that makes the execution method of a subexperiment interchangable. - -import os -import run_uc - -def execute(subexperiment_config): - run_uc.main( - exp_id=subexperiment_config.exp_id, - uc_id=subexperiment_config.use_case, - dim_value=int(subexperiment_config.dim_value), - instances=int(subexperiment_config.replicas), - partitions=subexperiment_config.partitions, - cpu_limit=subexperiment_config.cpu_limit, - memory_limit=subexperiment_config.memory_limit, - execution_minutes=int(subexperiment_config.execution_minutes), - prometheus_base_url=subexperiment_config.prometheus_base_url, - reset=subexperiment_config.reset, - ns=subexperiment_config.namespace, - result_path=subexperiment_config.result_path, - configurations=subexperiment_config.configurations) diff --git a/execution/strategies/tests/.gitignore b/execution/strategies/tests/.gitignore deleted file mode 100644 index 1998c294f84ec0ff4b32396e4cd8e74e352672e6..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.cache \ No newline at end of file diff --git a/execution/strategies/tests/__init__.py b/execution/strategies/tests/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py deleted file mode 100644 index d93d4924cf09015c714604f2fc995e1db971e69d..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py +++ /dev/null @@ -1,105 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -expected_order = [ - (0,3), # workload dim 0 - (0,1), - (0,0), - (1,3), # workload dim 1 - (1,1), - (1,2), - (2,4), # workload dim 2 - (2,2), - (3,4), # workload dim 3 - (3,2), - (3,3), - (4,4), # workload dim 4 - (4,3), - (5,5), # workload dim 5 - (5,6), - (6,6) # workload dim 6 - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(last_experiment) - print("Index was expected to be:") - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_binary_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py deleted file mode 100644 index c15daca6ebab3171f0995c048afe56c0185efe56..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py +++ /dev/null @@ -1,120 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (0,1), - (0,2), - (0,3), - (0,4), - (0,5), - (0,6), - (1,0), # workload dim 1 - (1,1), - (1,2), - (1,3), - (1,4), - (1,5), - (1,6), - (2,2), # workload dim 2 - (2,3), - (2,4), - (2,5), - (2,6), - (3,2), # workload dim 3 - (3,3), - (3,4), - (3,5), - (3,6), - (4,3), # workload dim 4 - (4,4), - (4,5), - (4,6), - (5,4), # workload dim 3 - (5,5), - (5,6), - (6,6) # workload dim 6 - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py deleted file mode 100644 index 86e2cd29d187cb83166102c503ee79e5e1424573..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py +++ /dev/null @@ -1,101 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), - (1,0), - (1,1), - (1,2), - (2,2), - (3,2), - (3,3), - (4,3), - (4,4), - (5,4), - (5,5), - (5,6), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py deleted file mode 100644 index 4f5da89cc72edd792015763539c9af4677772a79..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py +++ /dev/null @@ -1,110 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -expected_order = [ - (0,3), # workload dim 0 - (0,1), - (0,0), - (1,3), # workload dim 1 - (1,1), - (1,2), - (2,3), # workload dim 2 - (2,1), - (2,2), - (3,3), # workload dim 3 - (3,1), - (3,2), - (4,3), # workload dim 4 - (4,5), - (4,4), - (5,3), # workload dim 5 - (5,5), - (5,6), - (6,3), # workload dim 6 - (6,5), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(last_experiment) - print("Index was expected to be:") - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_binary_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_check_all_strategy.py b/execution/strategies/tests/test_no_restriction_check_all_strategy.py deleted file mode 100644 index f173a3d168704cc7a499933984b6510ebda2751e..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_check_all_strategy.py +++ /dev/null @@ -1,137 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (0,1), - (0,2), - (0,3), - (0,4), - (0,5), - (0,6), - (1,0), # workload dim 1 - (1,1), - (1,2), - (1,3), - (1,4), - (1,5), - (1,6), - (2,0), # workload dim 2 - (2,1), - (2,2), - (2,3), - (2,4), - (2,5), - (2,6), - (3,0), # workload dim 4 - (3,1), - (3,2), - (3,3), - (3,4), - (3,5), - (3,6), - (4,0), # workload dim 4 - (4,1), - (4,2), - (4,3), - (4,4), - (4,5), - (4,6), - (5,0), # workload dim 5 - (5,1), - (5,2), - (5,3), - (5,4), - (5,5), - (5,6), - (6,0), # workload dim 6 - (6,1), - (6,2), - (6,3), - (6,4), - (6,5), - (6,6), - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py deleted file mode 100644 index 0e47c2e95b75ae682e82a02ad3d0a91c5a62f253..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py +++ /dev/null @@ -1,118 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (1,0), # workload dim 1 - (1,1), - (1,2), - (2,0), # workload dim 2 - (2,1), - (2,2), - (3,0), # workload dim 3 - (3,1), - (3,2), - (3,3), - (4,0), # workload dim 4 - (4,1), - (4,2), - (4,3), - (4,4), - (5,0), # workload dim 5 - (5,1), - (5,2), - (5,3), - (5,4), - (5,5), - (5,6), - (6,0), # workload dim 6 - (6,1), - (6,2), - (6,3), - (6,4), - (6,5), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/theodolite.py b/execution/theodolite.py deleted file mode 100755 index bd273c4405e2a406b5b5537e084957625c19aa96..0000000000000000000000000000000000000000 --- a/execution/theodolite.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python - -import argparse -from lib.cli_parser import benchmark_parser -import logging # logging -import os -import run_uc -import sys -from strategies.config import ExperimentConfig -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor -import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator - - -def load_variables(): - """Load the CLI variables given at the command line""" - print('Load CLI variables') - parser = benchmark_parser("Run theodolite benchmarking") - args = parser.parse_args() - print(args) - if (args.uc is None or args.loads is None or args.instances_list is None) and not args.reset_only: - print('The options --uc, --loads and --instances are mandatory.') - print('Some might not be set!') - sys.exit(1) - return args - - -def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, - duration, domain_restriction, search_strategy, threshold, - prometheus_base_url, reset, namespace, result_path, configurations): - - print( - f"Domain restriction of search space activated: {domain_restriction}") - print(f"Chosen search strategy: {search_strategy}") - - counter_path = f"{result_path}/exp_counter.txt" - - if os.path.exists(counter_path): - with open(counter_path, mode="r") as read_stream: - exp_id = int(read_stream.read()) - else: - exp_id = 0 - # Create the directory if not exists - os.makedirs(result_path, exist_ok=True) - - # Store metadata - separator = "," - lines = [ - f'UC={uc}\n', - f'DIM_VALUES={separator.join(map(str, loads))}\n', - f'REPLICAS={separator.join(map(str, instances_list))}\n', - f'PARTITIONS={partitions}\n', - f'CPU_LIMIT={cpu_limit}\n', - f'MEMORY_LIMIT={memory_limit}\n', - f'EXECUTION_MINUTES={duration}\n', - f'DOMAIN_RESTRICTION={domain_restriction}\n', - f'SEARCH_STRATEGY={search_strategy}\n', - f'CONFIGURATIONS={configurations}' - ] - with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream: - stream.writelines(lines) - - with open(counter_path, mode="w") as write_stream: - write_stream.write(str(exp_id + 1)) - - domain_restriction_strategy = None - search_strategy_method = None - - # Select domain restriction - if domain_restriction: - # domain restriction - domain_restriction_strategy = lower_bound_strategy - else: - # no domain restriction - domain_restriction_strategy = no_lower_bound_strategy - - # select search strategy - if search_strategy == "linear-search": - print( - f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..") - search_strategy_method = linear_search_strategy - elif search_strategy == "binary-search": - search_strategy_method = binary_search_strategy - else: - print( - f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") - search_strategy_method = check_all_strategy - - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - execution_minutes=duration, - prometheus_base_url=prometheus_base_url, - reset=reset, - namespace=namespace, - configurations=configurations, - result_path=result_path, - domain_restriction_strategy=domain_restriction_strategy, - search_strategy=search_strategy_method, - threshold=threshold, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - - executor = ExperimentExecutor(experiment_config) - executor.execute() - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - args = load_variables() - if args.reset_only: - print('Only reset the cluster') - run_uc.main(None, None, None, None, None, None, None, None, None, - None, args.namespace, None, None, reset_only=True) - else: - main(args.uc, args.loads, args.instances_list, args.partitions, - args.cpu_limit, args.memory_limit, args.duration, - args.domain_restriction, args.search_strategy, - args.threshold, args.prometheus, args.reset, args.namespace, - args.path, args.configurations)