diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 995c925979bb482ffbeb7966542eebfbdb66017b..5a074d945ca61cb4414d7f6d2b958b1423b8286f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -24,7 +24,7 @@ lint-helm: entrypoint: [""] tags: - exec-docker - script: helm lint execution/helm/ + script: helm lint helm/ # Theodolite Benchmarks @@ -37,10 +37,11 @@ lint-helm: GRADLE_OPTS: "-Dorg.gradle.daemon=false" cache: paths: - - .gradle + - .gradle/wrapper + - .gradle/caches before_script: - - cd theodolite-benchmarks - export GRADLE_USER_HOME=`pwd`/.gradle + - cd theodolite-benchmarks build-benchmarks: stage: build @@ -213,7 +214,9 @@ deploy-uc4-load-generator: # Theodolite Framework .theodolite: - image: ghcr.io/graalvm/graalvm-ce:java11-21.0.0.2 + image: + name: ghcr.io/graalvm/native-image:java11-21.1.0 + entrypoint: [""] tags: - exec-docker variables: @@ -223,8 +226,8 @@ deploy-uc4-load-generator: - .gradle/wrapper - .gradle/caches before_script: - - cd theodolite-quarkus - export GRADLE_USER_HOME=`pwd`/.gradle + - cd theodolite-quarkus build-theodolite-jvm: stage: build @@ -240,7 +243,6 @@ build-theodolite-native: stage: build extends: .theodolite script: - - gu install native-image # TODO move to image - ./gradlew --build-cache assemble -Dquarkus.package.type=native when: manual artifacts: diff --git a/execution/Dockerfile b/execution/Dockerfile deleted file mode 100644 index e71bc91d9d31bea4c1598292e43d0ab7c193c3fa..0000000000000000000000000000000000000000 --- a/execution/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM python:3.8 - -RUN mkdir /app -WORKDIR /app -ADD requirements.txt /app/ -RUN pip install -r requirements.txt -COPY uc-workload-generator /app/uc-workload-generator -COPY uc-application /app/uc-application -COPY strategies /app/strategies -COPY lib /app/lib -COPY lag_analysis.py /app/ -COPY run_uc.py /app/ -COPY theodolite.py /app/ - -CMD ["python", "/app/theodolite.py"] diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py deleted file mode 100644 index 5b78ef3653753a2b95ac9b74bf8de156a71fb14c..0000000000000000000000000000000000000000 --- a/execution/lag_analysis.py +++ /dev/null @@ -1,167 +0,0 @@ -import sys -import os -import requests -from datetime import datetime, timedelta, timezone -import pandas as pd -import matplotlib.pyplot as plt -import csv -import logging - - -def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path): - print("Main") - time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) - - now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) - now = now_local - timedelta(milliseconds=time_diff_ms) - print(f"Now Local: {now_local}") - print(f"Now Used: {now}") - - end = now - start = now - timedelta(minutes=execution_minutes) - - #print(start.isoformat().replace('+00:00', 'Z')) - #print(end.isoformat().replace('+00:00', 'Z')) - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", - 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - # response - # print(response.request.path_url) - # response.content - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - topic = result['metric']['topic'] - for value in result['values']: - # print(value) - d.append({'topic': topic, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - # Do some analysis - - input = df.loc[df['topic'] == "input"] - - # input.plot(kind='line',x='timestamp',y='value',color='red') - # plt.show() - - from sklearn.linear_model import LinearRegression - - # values converts it into a numpy array - X = input.iloc[:, 1].values.reshape(-1, 1) - # -1 means that calculate the dimension of rows, but have 1 column - Y = input.iloc[:, 2].values.reshape(-1, 1) - linear_regressor = LinearRegression() # create object for the class - linear_regressor.fit(X, Y) # perform linear regression - Y_pred = linear_regressor.predict(X) # make predictions - - print(linear_regressor.coef_) - - # print(Y_pred) - - fields = [exp_id, datetime.now(), benchmark, dim_value, - instances, linear_regressor.coef_] - print(fields) - with open(f'{result_path}/results.csv', 'a') as f: - writer = csv.writer(f) - writer.writerow(fields) - - filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}" - - plt.plot(X, Y) - plt.plot(X, Y_pred, color='red') - - plt.savefig(f"{filename}_plot.png") - - df.to_csv(f"{filename}_values.csv") - - # Load total lag count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - group = result['metric']['group'] - for value in result['values']: - # print(value) - d.append({'group': group, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_totallag.csv") - - # Load partition count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - # print(result['metric']['topic']) - topic = result['metric']['topic'] - for value in result['values']: - # print(value) - d.append({'topic': topic, 'timestamp': int( - value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_partitions.csv") - - # Load instances count - - response = requests.get(prometheus_base_url + '/api/v1/query_range', params={ - 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", - 'start': start.isoformat(), - 'end': end.isoformat(), - 'step': '5s'}) - - results = response.json()['data']['result'] - - d = [] - - for result in results: - for value in result['values']: - # print(value) - d.append({'timestamp': int(value[0]), 'value': int(value[1])}) - - df = pd.DataFrame(d) - - df.to_csv(f"{filename}_instances.csv") - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - - # Load arguments - exp_id = sys.argv[1] - benchmark = sys.argv[2] - dim_value = sys.argv[3] - instances = sys.argv[4] - execution_minutes = int(sys.argv[5]) - - main(exp_id, benchmark, dim_value, instances, execution_minutes, - 'http://localhost:9090', 'results') diff --git a/execution/lib/__init__.py b/execution/lib/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py deleted file mode 100644 index de609bc55e21e9467a2b28168be6e478171cfddd..0000000000000000000000000000000000000000 --- a/execution/lib/cli_parser.py +++ /dev/null @@ -1,167 +0,0 @@ -import argparse -import os - - -def env_list_default(env, tf): - """ - Makes a list from an environment string. - """ - v = os.environ.get(env) - if v is not None: - v = [tf(s) for s in v.split(',')] - return v - - -def key_values_to_dict(kvs): - """ - Given a list with key values in form `Key=Value` it creates a dict from it. - """ - my_dict = {} - for kv in kvs: - k, v = kv.split("=") - my_dict[k] = v - return my_dict - - -def env_dict_default(env): - """ - Makes a dict from an environment string. - """ - v = os.environ.get(env) - if v is not None: - return key_values_to_dict(v.split(',')) - else: - return dict() - - -class StoreDictKeyPair(argparse.Action): - def __init__(self, option_strings, dest, nargs=None, **kwargs): - self._nargs = nargs - super(StoreDictKeyPair, self).__init__( - option_strings, dest, nargs=nargs, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - my_dict = key_values_to_dict(values) - setattr(namespace, self.dest, my_dict) - - -def default_parser(description): - """ - Returns the default parser that can be used for thodolite and run uc py - :param description: The description the argument parser should show. - """ - parser = argparse.ArgumentParser(description=description) - parser.add_argument('--uc', - metavar='<uc>', - default=os.environ.get('UC'), - help='[mandatory] use case number, one of 1, 2, 3 or 4') - parser.add_argument('--partitions', '-p', - metavar='<partitions>', - type=int, - default=os.environ.get('PARTITIONS', 40), - help='Number of partitions for Kafka topics') - parser.add_argument('--cpu-limit', '-cpu', - metavar='<CPU limit>', - default=os.environ.get('CPU_LIMIT', '1000m'), - help='Kubernetes CPU limit') - parser.add_argument('--memory-limit', '-mem', - metavar='<memory limit>', - default=os.environ.get('MEMORY_LIMIT', '4Gi'), - help='Kubernetes memory limit') - parser.add_argument('--duration', '-d', - metavar='<duration>', - type=int, - default=os.environ.get('DURATION', 5), - help='Duration in minutes subexperiments should be \ - executed for') - parser.add_argument('--namespace', - metavar='<NS>', - default=os.environ.get('NAMESPACE', 'default'), - help='Defines the Kubernetes where the applications should run') - parser.add_argument('--reset', - action="store_true", - default=os.environ.get( - 'RESET', 'false').lower() == 'true', - help='Resets the environment before execution') - parser.add_argument('--reset-only', - action="store_true", - default=os.environ.get( - 'RESET_ONLY', 'false').lower() == 'true', - help='Only resets the environment. Ignores all other parameters') - parser.add_argument('--prometheus', - metavar='<URL>', - default=os.environ.get( - 'PROMETHEUS_BASE_URL', 'http://localhost:9090'), - help='Defines where to find the prometheus instance') - parser.add_argument('--path', - metavar='<path>', - default=os.environ.get('RESULT_PATH', 'results'), - help='A directory path for the results') - parser.add_argument("--configurations", - metavar="KEY=VAL", - dest="configurations", - action=StoreDictKeyPair, - nargs="+", - default=env_dict_default('CONFIGURATIONS'), - help='Defines the environment variables for the UC') - return parser - - -def benchmark_parser(description): - """ - Parser for the overall benchmark execution - :param description: The description the argument parser should show. - """ - parser = default_parser(description) - - parser.add_argument('--loads', - metavar='<load>', - type=int, - nargs='+', - default=env_list_default('LOADS', int), - help='[mandatory] Loads that should be executed') - parser.add_argument('--instances', '-i', - dest='instances_list', - metavar='<instances>', - type=int, - nargs='+', - default=env_list_default('INSTANCES', int), - help='[mandatory] List of instances used in benchmarks') - parser.add_argument('--domain-restriction', - action="store_true", - default=os.environ.get( - 'DOMAIN_RESTRICTION', 'false').lower() == 'true', - help='To use domain restriction. For details see README') - parser.add_argument('--search-strategy', - metavar='<strategy>', - default=os.environ.get('SEARCH_STRATEGY', 'default'), - help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') - parser.add_argument('--threshold', - type=int, - metavar='<threshold>', - default=os.environ.get('THRESHOLD', 2000), - help='The threshold for the trend slop that the search strategies use to determine that a load could be handled') - return parser - - -def execution_parser(description): - """ - Parser for executing one use case - :param description: The description the argument parser should show. - """ - parser = default_parser(description) - parser.add_argument('--exp-id', - metavar='<exp id>', - default=os.environ.get('EXP_ID'), - help='[mandatory] ID of the experiment') - parser.add_argument('--load', - metavar='<load>', - type=int, - default=os.environ.get('LOAD'), - help='[mandatory] Load that should be used for benchmakr') - parser.add_argument('--instances', - metavar='<instances>', - type=int, - default=os.environ.get('INSTANCES'), - help='[mandatory] Numbers of instances to be benchmarked') - return parser diff --git a/execution/lib/trend_slope_computer.py b/execution/lib/trend_slope_computer.py deleted file mode 100644 index 90ae26cfd275f53307e19532f047e5e0a9326d3a..0000000000000000000000000000000000000000 --- a/execution/lib/trend_slope_computer.py +++ /dev/null @@ -1,19 +0,0 @@ -from sklearn.linear_model import LinearRegression -import pandas as pd -import os - -def compute(directory, filename, warmup_sec): - df = pd.read_csv(os.path.join(directory, filename)) - input = df - input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp'] - regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up - - X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array - Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column - linear_regressor = LinearRegression() # create object for the class - linear_regressor.fit(X, Y) # perform linear regression - Y_pred = linear_regressor.predict(X) # make predictions - - trend_slope = linear_regressor.coef_[0][0] - - return trend_slope diff --git a/execution/requirements.txt b/execution/requirements.txt deleted file mode 100644 index 18a06882007eebf69bf3bf4f84b869454b36a0a6..0000000000000000000000000000000000000000 --- a/execution/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -matplotlib==3.2.0 -pandas==1.0.1 -requests==2.23.0 -scikit-learn==0.22.2.post1 - -# For run_uc.py -kubernetes==11.0.0 -confuse==1.1.0 diff --git a/execution/run_uc.py b/execution/run_uc.py deleted file mode 100644 index 904b87b377ca2db3f2d4ddd4fb70aba0136cfa21..0000000000000000000000000000000000000000 --- a/execution/run_uc.py +++ /dev/null @@ -1,609 +0,0 @@ -import argparse # parse arguments from cli -import atexit # used to clear resources at exit of program (e.g. ctrl-c) -from kubernetes import client, config # kubernetes api -from kubernetes.stream import stream -import lag_analysis -import logging # logging -from os import path, environ # path utilities -from lib.cli_parser import execution_parser -import subprocess # execute bash commands -import sys # for exit of program -import time # process sleep -import yaml # convert from file to yaml object - -coreApi = None # acces kubernetes core api -appsApi = None # acces kubernetes apps api -customApi = None # acces kubernetes custom object api - -namespace = None - - -def load_variables(): - """Load the CLI variables given at the command line""" - print('Load CLI variables') - parser = execution_parser(description='Run use case Programm') - args = parser.parse_args() - print(args) - if (args.exp_id is None or args.uc is None or args.load is None or args.instances is None) and not args.reset_only: - print('The options --exp-id, --uc, --load and --instances are mandatory.') - print('Some might not be set!') - sys.exit(1) - return args - - -def initialize_kubernetes_api(): - """Load the kubernetes config from local or the cluster and creates - needed APIs. - """ - global coreApi, appsApi, customApi - print('Connect to kubernetes api') - try: - config.load_kube_config() # try using local config - except config.config_exception.ConfigException as e: - # load config from pod, if local config is not available - logging.debug( - 'Failed loading local Kubernetes configuration try from cluster') - logging.debug(e) - config.load_incluster_config() - - coreApi = client.CoreV1Api() - appsApi = client.AppsV1Api() - customApi = client.CustomObjectsApi() - - -def create_topics(topics): - """Create the topics needed for the use cases - :param topics: List of topics that should be created. - """ - # Calling exec and waiting for response - print('Create topics') - for (topic, partitions) in topics: - print(f'Create topic {topic} with #{partitions} partitions') - exec_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\ - --create --topic {topic} --partitions {partitions}\ - --replication-factor 1' - ] - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=exec_command, - stderr=True, stdin=False, - stdout=True, tty=False) - print(resp) - - -def load_yaml(file_path): - """Creates a yaml file from the file at given path. - :param file_path: The path to the file which contains the yaml. - :return: The file as a yaml object. - """ - try: - f = open(path.join(path.dirname(__file__), file_path)) - with f: - return yaml.safe_load(f) - except Exception as e: - logging.error('Error opening file %s', file_path) - logging.error(e) - - -def load_yaml_files(): - """Load the needed yaml files and creates objects from them. - :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy - """ - print('Load kubernetes yaml files') - wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml') - wg = load_yaml('uc-workload-generator/workloadGenerator.yaml') - app_svc = load_yaml('uc-application/aggregation-service.yaml') - app_svc_monitor = load_yaml('uc-application/service-monitor.yaml') - app_jmx = load_yaml('uc-application/jmx-configmap.yaml') - app_deploy = load_yaml('uc-application/aggregation-deployment.yaml') - - print('Kubernetes yaml files loaded') - return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy - - -def replace_env_value(container, key, value): - """ - Special method to replace in a container with kubernetes env values - the value of a given parameter. - """ - next(filter(lambda x: x['name'] == key, container))[ - 'value'] = value - - -def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id): - """Starts the workload generator. - :param wg_yaml: The yaml object for the workload generator service. - :param wg_yaml: The yaml object for the workload generator. - :param string dim_value: The dimension value the load generator should use. - :param string uc_id: Use case id for which load should be generated. - :return: - The StatefulSet created by the API or in case it already exist/error - the yaml object. - """ - print('Start workload generator') - svc, wg_deploy = None, None - - # Create Service - try: - svc = coreApi.create_namespaced_service( - namespace=namespace, body=svc_yaml) - print(f'Service {svc.metadata.name} created.') - except client.rest.ApiException as e: - svc = svc_yaml - logging.error("Service creation error: %s", e.reason) - - # Create Deployment - num_sensors = dim_value - wl_max_records = 150000 - wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records - - # set parameters special for uc 4 - if uc_id == '4': - print('use uc4 stuff') - num_nested_groups = dim_value - num_sensors = 4 - approx_num_sensors = num_sensors ** num_nested_groups - wl_instances = (approx_num_sensors + - wl_max_records - 1) // wl_max_records - - # Customize workload generator creations - wg_yaml['spec']['replicas'] = wl_instances - # Set used use case - wg_containter = next(filter( - lambda x: x['name'] == 'workload-generator', wg_yaml['spec']['template']['spec']['containers'])) - wg_containter['image'] = 'ghcr.io/cau-se/theodolite-uc' + uc_id + \ - '-workload-generator:latest' - # Set environment variables - - replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) - - if uc_id == '4': # Special configuration for UC4 - replace_env_value( - wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) - - try: - wg_deploy = appsApi.create_namespaced_deployment( - namespace=namespace, - body=wg_yaml - ) - print(f'Deployment {wg_deploy.metadata.name} created.') - except client.rest.ApiException as e: - print(f'Deployment creation error: {e.reason}') - wg_deploy = wg_yaml - - return svc, wg_deploy - - -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, - instances, uc_id, memory_limit, cpu_limit, - configurations): - """Applies the service, service monitor, jmx config map and start the - use case application. - - :param svc_yaml: The yaml object for the service. - :param svc_monitor_yaml: The yaml object for the service monitor. - :param jmx_yaml: The yaml object for the jmx config map. - :param deploy_yaml: The yaml object for the application. - :param int instances: Number of instances for use case application. - :param string uc_id: The id of the use case to execute. - :param string memory_limit: The memory limit for the application. - :param string cpu_limit: The CPU limit for the application. - :param dict configurations: A dictionary with ENV variables for configurations. - :return: - The Service, ServiceMonitor, JMX ConfigMap and Deployment. - In case the resource already exist/error the yaml object is returned. - return svc, svc_monitor, jmx_cm, app_deploy - """ - print('Start use case application') - svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None - - # Create Service - try: - svc = coreApi.create_namespaced_service( - namespace=namespace, body=svc_yaml) - print(f'Service {svc.metadata.name} created.') - except client.rest.ApiException as e: - svc = svc_yaml - logging.error("Service creation error: %s", e.reason) - - # Create custom object service monitor - try: - svc_monitor = customApi.create_namespaced_custom_object( - group="monitoring.coreos.com", - version="v1", - namespace=namespace, - plural="servicemonitors", # CustomResourceDef of ServiceMonitor - body=svc_monitor_yaml, - ) - print(f"ServiceMonitor '{svc_monitor['metadata']['name']}' created.") - except client.rest.ApiException as e: - svc_monitor = svc_monitor_yaml - logging.error("ServiceMonitor creation error: %s", e.reason) - - # Apply jmx config map for aggregation service - try: - jmx_cm = coreApi.create_namespaced_config_map( - namespace=namespace, body=jmx_yaml) - print(f"ConfigMap '{jmx_cm.metadata.name}' created.") - except client.rest.ApiException as e: - jmx_cm = jmx_yaml - logging.error("ConfigMap creation error: %s", e.reason) - - # Create deployment - deploy_yaml['spec']['replicas'] = instances - app_container = next(filter( - lambda x: x['name'] == 'uc-application', - deploy_yaml['spec']['template']['spec']['containers'])) - app_container['image'] = 'ghcr.io/cau-se/theodolite-uc' + uc_id \ - + '-kstreams-app:latest' - - # Set configurations environment parameters for SPE - for k, v in configurations.items(): - # check if environment variable is already definde in yaml - env = next(filter(lambda x: x['name'] == k, - app_container['env']), None) - if env is not None: - env['value'] = v # replace value - else: - # create new environment pair - conf = {'name': k, 'value': v} - app_container['env'].append(conf) - - # Set resources in Kubernetes - app_container['resources']['limits']['memory'] = memory_limit - app_container['resources']['limits']['cpu'] = cpu_limit - - # Deploy application - try: - app_deploy = appsApi.create_namespaced_deployment( - namespace=namespace, - body=deploy_yaml - ) - print(f"Deployment '{app_deploy.metadata.name}' created.") - except client.rest.ApiException as e: - app_deploy = deploy_yaml - logging.error("Deployment creation error: %s", e.reason) - - return svc, svc_monitor, jmx_cm, app_deploy - - -def wait_execution(execution_minutes): - """ - Wait time while in execution. - :param int execution_minutes: The duration to wait for execution. - """ - print('Wait while executing') - - for i in range(execution_minutes): - time.sleep(60) - print(f'Executed: {i+1} minutes') - print('Execution finished') - return - - -def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path): - """ - Runs the evaluation function - :param string exp_id: ID of the experiment. - :param string uc_id: ID of the executed use case. - :param int dim_value: The dimension value used for execution. - :param int instances: The number of instances used for the execution. - :param int execution_minutes: How long the use case where executed. - """ - print('Run evaluation function') - try: - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, - execution_minutes, prometheus_base_url, - result_path) - except Exception as e: - err_msg = 'Evaluation function failed' - print(err_msg) - logging.exception(err_msg) - print('Benchmark execution continues') - - return - - -def delete_resource(obj, del_func): - """ - Helper function to delete kuberentes resources. - First tries to delete with the kuberentes object. - Then it uses the dict representation of yaml to delete the object. - :param obj: Either kubernetes resource object or yaml as a dict. - :param del_func: The function that need to be executed for deletion - """ - try: - del_func(obj.metadata.name, namespace) - except Exception as e: - logging.debug( - 'Error deleting resource with api object, try with dict.') - try: - del_func(obj['metadata']['name'], namespace) - except Exception as e: - logging.error("Error deleting resource") - logging.error(e) - return - print('Resource deleted') - - -def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy): - """Stops the applied applications and delete resources. - :param wg: The load generator service. - :param wg: The load generator deployment. - :param app_svc: The application service. - :param app_svc_monitor: The application service monitor. - :param app_jmx: The application jmx config map. - :param app_deploy: The application deployment. - """ - print('Stop use case application and load generator') - - print('Delete load generator deployment') - delete_resource(wg, appsApi.delete_namespaced_deployment) - - print('Delete load generator service') - delete_resource(wg_svc, coreApi.delete_namespaced_service) - - print('Delete app service') - delete_resource(app_svc, coreApi.delete_namespaced_service) - - print('Delete service monitor') - try: - customApi.delete_namespaced_custom_object( - group="monitoring.coreos.com", - version="v1", - namespace=namespace, - plural="servicemonitors", - name=app_svc_monitor['metadata']['name']) - print('Resource deleted') - except Exception as e: - print('Error deleting service monitor') - - print('Delete jmx config map') - delete_resource(app_jmx, coreApi.delete_namespaced_config_map) - - print('Delete uc application') - delete_resource(app_deploy, appsApi.delete_namespaced_deployment) - - print('Check all pods deleted.') - while True: - # Wait bit for deletion - time.sleep(2) - - # Count how many pod still need to be deleted - no_load = len(coreApi.list_namespaced_pod( - namespace, label_selector='app=titan-ccp-load-generator').items) - no_uc = len(coreApi.list_namespaced_pod( - namespace, label_selector='app=titan-ccp-aggregation').items) - - # Check if all pods deleted - if no_load <= 0 and no_uc <= 0: - print('All pods deleted.') - break - - print(f'#{no_load} load generator and #{no_uc} uc pods needs to be deleted') - return - - -def delete_topics(topics): - """Delete topics from Kafka. - :param topics: List of topics to delete. - """ - print('Delete topics from Kafka') - - topics_delete = 'theodolite-.*|' + '|'.join([ti[0] for ti in topics]) - - num_topics_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list \ - | sed -n -E "/^({topics_delete})\ - ( - marked for deletion)?$/p" | wc -l' - ] - - topics_deletion_command = [ - '/bin/sh', - '-c', - f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete \ - --topic "{topics_delete}"' - ] - - # Wait that topics get deleted - while True: - # topic deletion, sometimes a second deletion seems to be required - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=topics_deletion_command, - stderr=True, stdin=False, - stdout=True, tty=False) - print(resp) - - print('Wait for topic deletion') - time.sleep(2) - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "kafka-client", - namespace, - command=num_topics_command, - stderr=True, stdin=False, - stdout=True, tty=False) - if resp == '0': - print('Topics deleted') - break - return - - -def reset_zookeeper(): - """Delete ZooKeeper configurations used for workload generation. - """ - print('Delete ZooKeeper configurations used for workload generation') - - delete_zoo_data_command = [ - '/bin/sh', - '-c', - 'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall ' - + '/workload-generation' - ] - - check_zoo_data_command = [ - '/bin/sh', - '-c', - 'zookeeper-shell my-confluent-cp-zookeeper:2181 get ' - + '/workload-generation' - ] - - # Wait for configuration deletion - while True: - # Delete Zookeeper configuration data - resp = stream(coreApi.connect_get_namespaced_pod_exec, - "zookeeper-client", - namespace, - command=delete_zoo_data_command, - stderr=True, stdin=False, - stdout=True, tty=False) - logging.debug(resp) - - # Check data is deleted - client = stream(coreApi.connect_get_namespaced_pod_exec, - "zookeeper-client", - namespace, - command=check_zoo_data_command, - stderr=True, stdin=False, - stdout=True, tty=False, - _preload_content=False) # Get client for returncode - client.run_forever(timeout=60) # Start the client - - if client.returncode == 1: # Means data not available anymore - print('ZooKeeper reset was successful.') - break - else: - print('ZooKeeper reset was not successful. Retrying in 5s.') - time.sleep(5) - return - - -def stop_lag_exporter(): - """ - Stop the lag exporter in order to reset it and allow smooth execution for - next use cases. - """ - print('Stop the lag exporter') - - try: - # Get lag exporter - pod_list = coreApi.list_namespaced_pod( - namespace=namespace, label_selector='app.kubernetes.io/name=kafka-lag-exporter') - lag_exporter_pod = pod_list.items[0].metadata.name - - # Delete lag exporter pod - res = coreApi.delete_namespaced_pod( - name=lag_exporter_pod, namespace=namespace) - except ApiException as e: - logging.error('Exception while stopping lag exporter') - logging.error(e) - - print('Deleted lag exporter pod: ' + lag_exporter_pod) - return - - -def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): - """ - Stop the applications, delete topics, reset zookeeper and stop lag exporter. - """ - print('Reset cluster') - stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy) - print('---------------------') - delete_topics(topics) - print('---------------------') - reset_zookeeper() - print('---------------------') - stop_lag_exporter() - - -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False): - """ - Main method to execute one time the benchmark for a given use case. - Start workload generator/application -> execute -> analyse -> stop all - :param string exp_id: The number of executed experiment - :param string uc_id: Use case to execute - :param int dim_value: Dimension value for load generator. - :param int instances: Number of instances for application. - :param int partitions: Number of partitions the kafka topics should have. - :param string cpu_limit: Max CPU utilazation for application. - :param string memory_limit: Max memory utilazation for application. - :param int execution_minutes: How long to execute the benchmark. - :param boolean reset: Flag for reset of cluster before execution. - :param dict configurations: Key value pairs for setting env variables of UC. - :param boolean reset_only: Flag to only reset the application. - """ - global namespace - namespace = ns - wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() - print('---------------------') - - initialize_kubernetes_api() - print('---------------------') - - topics = [('input', partitions), - ('output', partitions), - ('aggregation-feedback', partitions), - ('configuration', 1)] - - # Check for reset options - if reset_only: - # Only reset cluster an then end program - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, - app_jmx, app_deploy, topics) - sys.exit() - if reset: - # Reset cluster before execution - print('Reset only mode') - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, - app_jmx, app_deploy, topics) - print('---------------------') - - # Register the reset operation so that is executed at the abort of program - atexit.register(reset_cluster, wg_svc, wg, app_svc, - app_svc_monitor, app_jmx, app_deploy, topics) - - create_topics(topics) - print('---------------------') - - wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id) - print('---------------------') - - app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( - app_svc, - app_svc_monitor, - app_jmx, - app_deploy, - instances, - uc_id, - memory_limit, - cpu_limit, - configurations) - print('---------------------') - - wait_execution(execution_minutes) - print('---------------------') - - run_evaluation(exp_id, uc_id, dim_value, instances, - execution_minutes, prometheus_base_url, result_path) - print('---------------------') - - # Reset cluster regular, therefore abort exit not needed anymore - reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) - atexit.unregister(reset_cluster) - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - args = load_variables() - print('---------------------') - main(args.exp_id, args.uc, args.load, args.instances, args.partitions, - args.cpu_limit, args.memory_limit, args.duration, args.prometheus, - args.reset, args.namespace, args.path, args.configurations, - args.reset_only) diff --git a/execution/strategies/__init__.py b/execution/strategies/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/strategies/config.py b/execution/strategies/config.py deleted file mode 100644 index d4df97c18ae54c7c181ddf08264c013f9447350f..0000000000000000000000000000000000000000 --- a/execution/strategies/config.py +++ /dev/null @@ -1,23 +0,0 @@ -from dataclasses import dataclass - -@dataclass -class ExperimentConfig: - """ Wrapper for the configuration of an experiment. """ - use_case: str - exp_id: int - dim_values: list - replicass: list - partitions: int - cpu_limit: str - memory_limit: str - execution_minutes: int - prometheus_base_url: str - reset: bool - namespace: str - result_path: str - configurations: dict - domain_restriction_strategy: object - search_strategy: object - threshold: int - subexperiment_executor: object - subexperiment_evaluator: object diff --git a/execution/strategies/experiment_execution.py b/execution/strategies/experiment_execution.py deleted file mode 100644 index c2ee18f9b79a6e880dbcb69b47061cc5ecc6b9ba..0000000000000000000000000000000000000000 --- a/execution/strategies/experiment_execution.py +++ /dev/null @@ -1,6 +0,0 @@ -class ExperimentExecutor: - def __init__(self, config): - self.config=config - - def execute(self): - self.config.domain_restriction_strategy.execute(self.config) diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py deleted file mode 100644 index 5c31f8c97a4085931cdfa1fa017d4e5909e21915..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/config.py +++ /dev/null @@ -1,19 +0,0 @@ -from dataclasses import dataclass - -@dataclass -class SubexperimentConfig: - """ Wrapper for the configuration of a subexperiment """ - use_case: str - exp_id: int - counter: int - dim_value: int - replicas: int - partitions: int - cpu_limit: str - memory_limit: str - execution_minutes: int - prometheus_base_url: str - reset: bool - namespace: str - result_path: str - configurations: dict diff --git a/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py deleted file mode 100644 index b218731fc76d83347b4dbf10448f01615d378c0b..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/domain_restriction/lower_bound_strategy.py +++ /dev/null @@ -1,12 +0,0 @@ -# The lower bound strategy -def execute(config): - dim_value_index = 0 - lower_bound_replicas_index = 0 - subexperiment_counter = 0 - while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass): - lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute( - config=config, - dim_value_index=dim_value_index, - lower_replicas_bound_index=lower_bound_replicas_index, - subexperiment_counter=subexperiment_counter) - dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py b/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py deleted file mode 100644 index e5dea56118460b0dfdc6b1c36ce2587b6752512b..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/domain_restriction/no_lower_bound_strategy.py +++ /dev/null @@ -1,11 +0,0 @@ -# The strategy where the domain contains all amounts of instances -def execute(config): - dim_value_index = 0 - subexperiment_counter = 0 - while dim_value_index < len(config.dim_values): - _, subexperiment_counter = config.search_strategy.execute( - config=config, - dim_value_index=dim_value_index, - lower_replicas_bound_index=0, - subexperiment_counter=subexperiment_counter) - dim_value_index+=1 \ No newline at end of file diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py deleted file mode 100644 index 46748cbda250597b3a7644522126268be4599293..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ /dev/null @@ -1,50 +0,0 @@ -# The binary search strategy -import os -from strategies.strategies.config import SubexperimentConfig - -def binary_search(config, dim_value, lower, upper, subexperiment_counter): - if lower == upper: - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # successful, the upper neighbor is assumed to also has been successful - return (lower, subexperiment_counter+1) - else: # not successful - return (lower+1, subexperiment_counter) - elif lower+1==upper: - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # minimal instances found - return (lower, subexperiment_counter) - else: # not successful, check if lower+1 instances are sufficient - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # minimal instances found - return (upper, subexperiment_counter) - else: - return (upper+1, subexperiment_counter) - else: - # test mid - mid=(upper+lower)//2 - print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: # success -> search in (lower, mid-1) - return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1) - else: # not success -> search in (mid+1, upper) - return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1) - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - upper = len(config.replicass)-1 - dim_value=config.dim_values[dim_value_index] - return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter) diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py deleted file mode 100644 index 0861945113b829fa79317d8a1a6312b4d6e4f71d..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ /dev/null @@ -1,31 +0,0 @@ -# The check_all strategy -import os -from strategies.strategies.config import SubexperimentConfig - - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - new_lower_replicas_bound_index = lower_replicas_bound_index - new_lower_replicas_bound_found = False - subexperiments_total = len(config.dim_values) * len(config.replicass) - while lower_replicas_bound_index < len(config.replicass): - subexperiment_counter += 1 - dim_value = config.dim_values[dim_value_index] - replicas = config.replicass[lower_replicas_bound_index] - print( - f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - - subexperiment_config = SubexperimentConfig( - config.use_case, config.exp_id, subexperiment_counter, dim_value, - replicas, config.partitions, config.cpu_limit, config.memory_limit, - config.execution_minutes, config.prometheus_base_url, config.reset, - config.namespace, config.result_path, config.configurations) - - config.subexperiment_executor.execute(subexperiment_config) - - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success and not new_lower_replicas_bound_found: - new_lower_replicas_bound_found = True - new_lower_replicas_bound_index = lower_replicas_bound_index - lower_replicas_bound_index += 1 - return (new_lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py deleted file mode 100644 index 8e777303742e54cf2a11a1bde60e95b8aa85489d..0000000000000000000000000000000000000000 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ /dev/null @@ -1,23 +0,0 @@ -# The linear-search strategy - -import os -from strategies.strategies.config import SubexperimentConfig - -def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter): - subexperiments_total=len(config.dim_values)+len(config.replicass)-1 - dim_value=config.dim_values[dim_value_index] - while lower_replicas_bound_index < len(config.replicass): - subexperiment_counter+=1 - replicas=config.replicass[lower_replicas_bound_index] - print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) - - config.subexperiment_executor.execute(subexperiment_config) - success = config.subexperiment_evaluator.execute(subexperiment_config, - config.threshold) - if success: - return (lower_replicas_bound_index, subexperiment_counter) - else: - lower_replicas_bound_index+=1 - return (lower_replicas_bound_index, subexperiment_counter) diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py deleted file mode 100644 index 30188de837746b76113ec635ca77fadc3a91cb92..0000000000000000000000000000000000000000 --- a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py +++ /dev/null @@ -1,29 +0,0 @@ -import lib.trend_slope_computer as trend_slope_computer -import logging -import os - -WARMUP_SEC = 60 - -def execute(config, threshold): - """ - Check the trend slope of the totallag of the subexperiment if it comes below - the threshold. - - :param config: Configuration of the subexperiment. - :param threshold: The threshold the trendslope need to come below. - """ - cwd = f'{os.getcwd()}/{config.result_path}' - file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv" - - try: - trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC) - except Exception as e: - err_msg = 'Computing trend slope failed' - print(err_msg) - logging.exception(err_msg) - print('Mark this subexperiment as not successful and continue benchmark') - return False - - print(f"Trend Slope: {trend_slope}") - - return trend_slope < threshold diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py deleted file mode 100644 index 6931dacfc72081cbe112c4d6d1003703ba42c526..0000000000000000000000000000000000000000 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ /dev/null @@ -1,20 +0,0 @@ -# Wrapper that makes the execution method of a subexperiment interchangable. - -import os -import run_uc - -def execute(subexperiment_config): - run_uc.main( - exp_id=subexperiment_config.exp_id, - uc_id=subexperiment_config.use_case, - dim_value=int(subexperiment_config.dim_value), - instances=int(subexperiment_config.replicas), - partitions=subexperiment_config.partitions, - cpu_limit=subexperiment_config.cpu_limit, - memory_limit=subexperiment_config.memory_limit, - execution_minutes=int(subexperiment_config.execution_minutes), - prometheus_base_url=subexperiment_config.prometheus_base_url, - reset=subexperiment_config.reset, - ns=subexperiment_config.namespace, - result_path=subexperiment_config.result_path, - configurations=subexperiment_config.configurations) diff --git a/execution/strategies/tests/.gitignore b/execution/strategies/tests/.gitignore deleted file mode 100644 index 1998c294f84ec0ff4b32396e4cd8e74e352672e6..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.cache \ No newline at end of file diff --git a/execution/strategies/tests/__init__.py b/execution/strategies/tests/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py b/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py deleted file mode 100644 index d93d4924cf09015c714604f2fc995e1db971e69d..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_binary_search_strategy.py +++ /dev/null @@ -1,105 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -expected_order = [ - (0,3), # workload dim 0 - (0,1), - (0,0), - (1,3), # workload dim 1 - (1,1), - (1,2), - (2,4), # workload dim 2 - (2,2), - (3,4), # workload dim 3 - (3,2), - (3,3), - (4,4), # workload dim 4 - (4,3), - (5,5), # workload dim 5 - (5,6), - (6,6) # workload dim 6 - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(last_experiment) - print("Index was expected to be:") - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_binary_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py b/execution/strategies/tests/test_domain_restriction_check_all_strategy.py deleted file mode 100644 index c15daca6ebab3171f0995c048afe56c0185efe56..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_check_all_strategy.py +++ /dev/null @@ -1,120 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (0,1), - (0,2), - (0,3), - (0,4), - (0,5), - (0,6), - (1,0), # workload dim 1 - (1,1), - (1,2), - (1,3), - (1,4), - (1,5), - (1,6), - (2,2), # workload dim 2 - (2,3), - (2,4), - (2,5), - (2,6), - (3,2), # workload dim 3 - (3,3), - (3,4), - (3,5), - (3,6), - (4,3), # workload dim 4 - (4,4), - (4,5), - (4,6), - (5,4), # workload dim 3 - (5,5), - (5,6), - (6,6) # workload dim 6 - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py b/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py deleted file mode 100644 index 86e2cd29d187cb83166102c503ee79e5e1424573..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_domain_restriction_linear_search_strategy.py +++ /dev/null @@ -1,101 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), - (1,0), - (1,1), - (1,2), - (2,2), - (3,2), - (3,3), - (4,3), - (4,4), - (5,4), - (5,5), - (5,6), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py b/execution/strategies/tests/test_no_restriction_binary_search_strategy.py deleted file mode 100644 index 4f5da89cc72edd792015763539c9af4677772a79..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_binary_search_strategy.py +++ /dev/null @@ -1,110 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -expected_order = [ - (0,3), # workload dim 0 - (0,1), - (0,0), - (1,3), # workload dim 1 - (1,1), - (1,2), - (2,3), # workload dim 2 - (2,1), - (2,2), - (3,3), # workload dim 3 - (3,1), - (3,2), - (4,3), # workload dim 4 - (4,5), - (4,4), - (5,3), # workload dim 5 - (5,5), - (5,6), - (6,3), # workload dim 6 - (6,5), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(last_experiment) - print("Index was expected to be:") - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_binary_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=binary_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_check_all_strategy.py b/execution/strategies/tests/test_no_restriction_check_all_strategy.py deleted file mode 100644 index f173a3d168704cc7a499933984b6510ebda2751e..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_check_all_strategy.py +++ /dev/null @@ -1,137 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (0,1), - (0,2), - (0,3), - (0,4), - (0,5), - (0,6), - (1,0), # workload dim 1 - (1,1), - (1,2), - (1,3), - (1,4), - (1,5), - (1,6), - (2,0), # workload dim 2 - (2,1), - (2,2), - (2,3), - (2,4), - (2,5), - (2,6), - (3,0), # workload dim 4 - (3,1), - (3,2), - (3,3), - (3,4), - (3,5), - (3,6), - (4,0), # workload dim 4 - (4,1), - (4,2), - (4,3), - (4,4), - (4,5), - (4,6), - (5,0), # workload dim 5 - (5,1), - (5,2), - (5,3), - (5,4), - (5,5), - (5,6), - (6,0), # workload dim 6 - (6,1), - (6,2), - (6,3), - (6,4), - (6,5), - (6,6), - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=check_all_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py b/execution/strategies/tests/test_no_restriction_linear_search_strategy.py deleted file mode 100644 index 0e47c2e95b75ae682e82a02ad3d0a91c5a62f253..0000000000000000000000000000000000000000 --- a/execution/strategies/tests/test_no_restriction_linear_search_strategy.py +++ /dev/null @@ -1,118 +0,0 @@ -import pprint - -from strategies.config import ExperimentConfig -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor - -class Object(object): - pass - -pp = pprint.PrettyPrinter(indent=4) - -dim_values = [0, 1, 2, 3, 4, 5, 6] -replicass = [0, 1, 2, 3, 4, 5, 6] - -# True means the experiment was successful -# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. -# this means the first row starts with (0,0), the second row with (1, 0) etc. -successful = [ - [ True , True , True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, True , True , True , True , True ], - [ False, False, False, True , True , True , True ], - [ False, False, False, False, True , True , True ], - [ False, False, False, False, False, False, True ], - [ False, False, False, False, False, False, False ] - ] - -# the expected order of executed experiments -expected_order = [ - (0,0), # workload dim 0 - (1,0), # workload dim 1 - (1,1), - (1,2), - (2,0), # workload dim 2 - (2,1), - (2,2), - (3,0), # workload dim 3 - (3,1), - (3,2), - (3,3), - (4,0), # workload dim 4 - (4,1), - (4,2), - (4,3), - (4,4), - (5,0), # workload dim 5 - (5,1), - (5,2), - (5,3), - (5,4), - (5,5), - (5,6), - (6,0), # workload dim 6 - (6,1), - (6,2), - (6,3), - (6,4), - (6,5), - (6,6) - ] - -last_experiment = (0, 0) -experiment_counter = -1 -subexperiment_executor = Object() - -def subexperiment_executor_executor(config): - global experiment_counter, last_experiment, pp - print("Simulate subexperiment with config:") - pp.pprint(config) - last_experiment = (config.dim_value, config.replicas) - experiment_counter += 1 - print("Simulation complete") - -subexperiment_executor.execute = subexperiment_executor_executor - - -# returns True if the experiment was successful - -subexperiment_evaluator = Object() - -def subexperiment_evaluator_execute(i): - print("Evaluating last experiment. Index was:") - global expected_order, experiment_counter, last_experiment, successful - pp.pprint(expected_order[experiment_counter]) - assert expected_order[experiment_counter] == last_experiment - print("Index was as expected. Evaluation finished.") - return 1 if successful[last_experiment[0]][last_experiment[1]] else 0 - -subexperiment_evaluator.execute = subexperiment_evaluator_execute - -def test_linear_search_strategy(): - # declare parameters - uc="test-uc" - partitions=40 - cpu_limit="1000m" - memory_limit="4Gi" - kafka_streams_commit_interval_ms=100 - execution_minutes=5 - - # execute - experiment_config = ExperimentConfig( - exp_id="0", - use_case=uc, - dim_values=dim_values, - replicass=replicass, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms, - execution_minutes=execution_minutes, - domain_restriction_strategy=no_lower_bound_strategy, - search_strategy=linear_search_strategy, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - executor = ExperimentExecutor(experiment_config) - executor.execute() \ No newline at end of file diff --git a/execution/theodolite.py b/execution/theodolite.py deleted file mode 100755 index bd273c4405e2a406b5b5537e084957625c19aa96..0000000000000000000000000000000000000000 --- a/execution/theodolite.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python - -import argparse -from lib.cli_parser import benchmark_parser -import logging # logging -import os -import run_uc -import sys -from strategies.config import ExperimentConfig -import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy -import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy -import strategies.strategies.search.check_all_strategy as check_all_strategy -import strategies.strategies.search.linear_search_strategy as linear_search_strategy -import strategies.strategies.search.binary_search_strategy as binary_search_strategy -from strategies.experiment_execution import ExperimentExecutor -import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor -import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator - - -def load_variables(): - """Load the CLI variables given at the command line""" - print('Load CLI variables') - parser = benchmark_parser("Run theodolite benchmarking") - args = parser.parse_args() - print(args) - if (args.uc is None or args.loads is None or args.instances_list is None) and not args.reset_only: - print('The options --uc, --loads and --instances are mandatory.') - print('Some might not be set!') - sys.exit(1) - return args - - -def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, - duration, domain_restriction, search_strategy, threshold, - prometheus_base_url, reset, namespace, result_path, configurations): - - print( - f"Domain restriction of search space activated: {domain_restriction}") - print(f"Chosen search strategy: {search_strategy}") - - counter_path = f"{result_path}/exp_counter.txt" - - if os.path.exists(counter_path): - with open(counter_path, mode="r") as read_stream: - exp_id = int(read_stream.read()) - else: - exp_id = 0 - # Create the directory if not exists - os.makedirs(result_path, exist_ok=True) - - # Store metadata - separator = "," - lines = [ - f'UC={uc}\n', - f'DIM_VALUES={separator.join(map(str, loads))}\n', - f'REPLICAS={separator.join(map(str, instances_list))}\n', - f'PARTITIONS={partitions}\n', - f'CPU_LIMIT={cpu_limit}\n', - f'MEMORY_LIMIT={memory_limit}\n', - f'EXECUTION_MINUTES={duration}\n', - f'DOMAIN_RESTRICTION={domain_restriction}\n', - f'SEARCH_STRATEGY={search_strategy}\n', - f'CONFIGURATIONS={configurations}' - ] - with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream: - stream.writelines(lines) - - with open(counter_path, mode="w") as write_stream: - write_stream.write(str(exp_id + 1)) - - domain_restriction_strategy = None - search_strategy_method = None - - # Select domain restriction - if domain_restriction: - # domain restriction - domain_restriction_strategy = lower_bound_strategy - else: - # no domain restriction - domain_restriction_strategy = no_lower_bound_strategy - - # select search strategy - if search_strategy == "linear-search": - print( - f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..") - search_strategy_method = linear_search_strategy - elif search_strategy == "binary-search": - search_strategy_method = binary_search_strategy - else: - print( - f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..") - search_strategy_method = check_all_strategy - - experiment_config = ExperimentConfig( - use_case=uc, - exp_id=exp_id, - dim_values=loads, - replicass=instances_list, - partitions=partitions, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - execution_minutes=duration, - prometheus_base_url=prometheus_base_url, - reset=reset, - namespace=namespace, - configurations=configurations, - result_path=result_path, - domain_restriction_strategy=domain_restriction_strategy, - search_strategy=search_strategy_method, - threshold=threshold, - subexperiment_executor=subexperiment_executor, - subexperiment_evaluator=subexperiment_evaluator) - - executor = ExperimentExecutor(experiment_config) - executor.execute() - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - args = load_variables() - if args.reset_only: - print('Only reset the cluster') - run_uc.main(None, None, None, None, None, None, None, None, None, - None, args.namespace, None, None, reset_only=True) - else: - main(args.uc, args.loads, args.instances_list, args.partitions, - args.cpu_limit, args.memory_limit, args.duration, - args.domain_restriction, args.search_strategy, - args.threshold, args.prometheus, args.reset, args.namespace, - args.path, args.configurations) diff --git a/execution/uc-application/aggregation-deployment.yaml b/execution/uc-application/aggregation-deployment.yaml deleted file mode 100644 index 1d3ebdb20dd06433e97e112edef76d7deac17395..0000000000000000000000000000000000000000 --- a/execution/uc-application/aggregation-deployment.yaml +++ /dev/null @@ -1,55 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-aggregation -spec: - selector: - matchLabels: - app: titan-ccp-aggregation - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-aggregation - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: uc-application - image: uc-app:latest - ports: - - containerPort: 5555 - name: jmx - env: - - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://theodolite-cp-schema-registry:8081" - - name: JAVA_OPTS - value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" - - name: COMMIT_INTERVAL_MS # Set as default for the applications - value: "100" - resources: - limits: - memory: 4Gi - cpu: 1000m - - name: prometheus-jmx-exporter - image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143" - command: - - java - - -XX:+UnlockExperimentalVMOptions - - -XX:+UseCGroupMemoryLimitForHeap - - -XX:MaxRAMFraction=1 - - -XshowSettings:vm - - -jar - - jmx_prometheus_httpserver.jar - - "5556" - - /etc/jmx-aggregation/jmx-kafka-prometheus.yml - ports: - - containerPort: 5556 - volumeMounts: - - name: jmx-config - mountPath: /etc/jmx-aggregation - volumes: - - name: jmx-config - configMap: - name: aggregation-jmx-configmap diff --git a/execution/uc-application/aggregation-service.yaml b/execution/uc-application/aggregation-service.yaml deleted file mode 100644 index 6317caf9fe624e42449b8f630d040a068709cda3..0000000000000000000000000000000000000000 --- a/execution/uc-application/aggregation-service.yaml +++ /dev/null @@ -1,17 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-aggregation - labels: - app: titan-ccp-aggregation -spec: - #type: NodePort - selector: - app: titan-ccp-aggregation - ports: - - name: http - port: 80 - targetPort: 80 - protocol: TCP - - name: metrics - port: 5556 diff --git a/execution/uc-application/jmx-configmap.yaml b/execution/uc-application/jmx-configmap.yaml deleted file mode 100644 index 78496a86b1242a89b9e844ead3e700fd0b9a9667..0000000000000000000000000000000000000000 --- a/execution/uc-application/jmx-configmap.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: aggregation-jmx-configmap -data: - jmx-kafka-prometheus.yml: |+ - jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi - lowercaseOutputName: true - lowercaseOutputLabelNames: true - ssl: false diff --git a/execution/uc-application/service-monitor.yaml b/execution/uc-application/service-monitor.yaml deleted file mode 100644 index 4e7e758cacb5086305efa26292ddef2afc958096..0000000000000000000000000000000000000000 --- a/execution/uc-application/service-monitor.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: monitoring.coreos.com/v1 -kind: ServiceMonitor -metadata: - labels: - app: titan-ccp-aggregation - appScope: titan-ccp - name: titan-ccp-aggregation -spec: - selector: - matchLabels: - app: titan-ccp-aggregation - endpoints: - - port: metrics - interval: 10s diff --git a/execution/uc-workload-generator/load-generator-service.yaml b/execution/uc-workload-generator/load-generator-service.yaml deleted file mode 100644 index c1299e373009dee5fa4cc87093ebc684c7f2e333..0000000000000000000000000000000000000000 --- a/execution/uc-workload-generator/load-generator-service.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-load-generator - labels: - app: titan-ccp-load-generator -spec: - type: ClusterIP - clusterIP: None - selector: - app: titan-ccp-load-generator - ports: - - name: coordination - port: 5701 - targetPort: 5701 - protocol: TCP diff --git a/execution/uc-workload-generator/workloadGenerator.yaml b/execution/uc-workload-generator/workloadGenerator.yaml deleted file mode 100644 index 8f7cc3a8df20752eccb321242bb774c18f4e1d0a..0000000000000000000000000000000000000000 --- a/execution/uc-workload-generator/workloadGenerator.yaml +++ /dev/null @@ -1,37 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-load-generator -spec: - selector: - matchLabels: - app: titan-ccp-load-generator - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-load-generator - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: workload-generator - image: workload-generator:latest - ports: - - containerPort: 5701 - name: coordination - env: - # Order need to be preserved for run_uc.py - - name: NUM_SENSORS - value: "25000" - - name: NUM_NESTED_GROUPS - value: "5" - - name: KUBERNETES_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: KUBERNETES_DNS_NAME - value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://theodolite-cp-schema-registry:8081" diff --git a/execution/helm/.gitignore b/helm/.gitignore similarity index 100% rename from execution/helm/.gitignore rename to helm/.gitignore diff --git a/execution/helm/.helmignore b/helm/.helmignore similarity index 100% rename from execution/helm/.helmignore rename to helm/.helmignore diff --git a/execution/helm/Chart.yaml b/helm/Chart.yaml similarity index 100% rename from execution/helm/Chart.yaml rename to helm/Chart.yaml diff --git a/execution/helm/README.md b/helm/README.md similarity index 100% rename from execution/helm/README.md rename to helm/README.md diff --git a/execution/helm/build-package.sh b/helm/build-package.sh similarity index 100% rename from execution/helm/build-package.sh rename to helm/build-package.sh diff --git a/execution/helm/preconfigs/one-broker-values.yaml b/helm/preconfigs/one-broker-values.yaml similarity index 100% rename from execution/helm/preconfigs/one-broker-values.yaml rename to helm/preconfigs/one-broker-values.yaml diff --git a/execution/helm/templates/NOTES.txt b/helm/templates/NOTES.txt similarity index 100% rename from execution/helm/templates/NOTES.txt rename to helm/templates/NOTES.txt diff --git a/execution/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl similarity index 100% rename from execution/helm/templates/_helpers.tpl rename to helm/templates/_helpers.tpl diff --git a/execution/helm/templates/grafana/dashboard-config-map.yaml b/helm/templates/grafana/dashboard-config-map.yaml similarity index 100% rename from execution/helm/templates/grafana/dashboard-config-map.yaml rename to helm/templates/grafana/dashboard-config-map.yaml diff --git a/execution/helm/templates/kafka/kafka-client.yaml b/helm/templates/kafka/kafka-client.yaml similarity index 100% rename from execution/helm/templates/kafka/kafka-client.yaml rename to helm/templates/kafka/kafka-client.yaml diff --git a/execution/helm/templates/kafka/service-monitor.yaml b/helm/templates/kafka/service-monitor.yaml similarity index 100% rename from execution/helm/templates/kafka/service-monitor.yaml rename to helm/templates/kafka/service-monitor.yaml diff --git a/execution/helm/templates/prometheus/cluster-role-binding.yaml b/helm/templates/prometheus/cluster-role-binding.yaml similarity index 100% rename from execution/helm/templates/prometheus/cluster-role-binding.yaml rename to helm/templates/prometheus/cluster-role-binding.yaml diff --git a/execution/helm/templates/prometheus/cluster-role.yaml b/helm/templates/prometheus/cluster-role.yaml similarity index 100% rename from execution/helm/templates/prometheus/cluster-role.yaml rename to helm/templates/prometheus/cluster-role.yaml diff --git a/execution/helm/templates/prometheus/datasource-config-map.yaml b/helm/templates/prometheus/datasource-config-map.yaml similarity index 100% rename from execution/helm/templates/prometheus/datasource-config-map.yaml rename to helm/templates/prometheus/datasource-config-map.yaml diff --git a/execution/helm/templates/prometheus/prometheus.yaml b/helm/templates/prometheus/prometheus.yaml similarity index 100% rename from execution/helm/templates/prometheus/prometheus.yaml rename to helm/templates/prometheus/prometheus.yaml diff --git a/execution/helm/templates/prometheus/service-account.yaml b/helm/templates/prometheus/service-account.yaml similarity index 100% rename from execution/helm/templates/prometheus/service-account.yaml rename to helm/templates/prometheus/service-account.yaml diff --git a/execution/helm/templates/tests/test-connection.yaml b/helm/templates/tests/test-connection.yaml similarity index 100% rename from execution/helm/templates/tests/test-connection.yaml rename to helm/templates/tests/test-connection.yaml diff --git a/execution/helm/templates/theodolite/crd-benchmark.yaml b/helm/templates/theodolite/crd-benchmark.yaml similarity index 100% rename from execution/helm/templates/theodolite/crd-benchmark.yaml rename to helm/templates/theodolite/crd-benchmark.yaml diff --git a/execution/helm/templates/theodolite/crd-execution.yaml b/helm/templates/theodolite/crd-execution.yaml similarity index 100% rename from execution/helm/templates/theodolite/crd-execution.yaml rename to helm/templates/theodolite/crd-execution.yaml diff --git a/execution/helm/templates/theodolite/random-scheduler/cluster-role-binding.yaml b/helm/templates/theodolite/random-scheduler/cluster-role-binding.yaml similarity index 100% rename from execution/helm/templates/theodolite/random-scheduler/cluster-role-binding.yaml rename to helm/templates/theodolite/random-scheduler/cluster-role-binding.yaml diff --git a/execution/helm/templates/theodolite/random-scheduler/deployment.yaml b/helm/templates/theodolite/random-scheduler/deployment.yaml similarity index 100% rename from execution/helm/templates/theodolite/random-scheduler/deployment.yaml rename to helm/templates/theodolite/random-scheduler/deployment.yaml diff --git a/execution/helm/templates/theodolite/random-scheduler/service-account.yaml b/helm/templates/theodolite/random-scheduler/service-account.yaml similarity index 100% rename from execution/helm/templates/theodolite/random-scheduler/service-account.yaml rename to helm/templates/theodolite/random-scheduler/service-account.yaml diff --git a/execution/helm/templates/theodolite/role-binding.yaml b/helm/templates/theodolite/role-binding.yaml similarity index 100% rename from execution/helm/templates/theodolite/role-binding.yaml rename to helm/templates/theodolite/role-binding.yaml diff --git a/execution/helm/templates/theodolite/role.yaml b/helm/templates/theodolite/role.yaml similarity index 100% rename from execution/helm/templates/theodolite/role.yaml rename to helm/templates/theodolite/role.yaml diff --git a/execution/helm/templates/theodolite/serviceaccount.yaml b/helm/templates/theodolite/serviceaccount.yaml similarity index 100% rename from execution/helm/templates/theodolite/serviceaccount.yaml rename to helm/templates/theodolite/serviceaccount.yaml diff --git a/execution/helm/templates/theodolite/theodolite-operator.yaml b/helm/templates/theodolite/theodolite-operator.yaml similarity index 100% rename from execution/helm/templates/theodolite/theodolite-operator.yaml rename to helm/templates/theodolite/theodolite-operator.yaml diff --git a/execution/helm/update-index.sh b/helm/update-index.sh similarity index 100% rename from execution/helm/update-index.sh rename to helm/update-index.sh diff --git a/execution/helm/values.yaml b/helm/values.yaml similarity index 100% rename from execution/helm/values.yaml rename to helm/values.yaml diff --git a/theodolite-quarkus/build.gradle b/theodolite-quarkus/build.gradle index 21f65678407568bb8d519e4c7e6f4bacff90205f..3082deaf12fc48c6aca97ffd00b9c74cd7e6c143 100644 --- a/theodolite-quarkus/build.gradle +++ b/theodolite-quarkus/build.gradle @@ -21,16 +21,18 @@ dependencies { implementation 'com.google.code.gson:gson:2.8.5' implementation 'org.slf4j:slf4j-simple:1.7.29' implementation 'io.github.microutils:kotlin-logging:1.12.0' - implementation 'io.fabric8:kubernetes-client:5.0.0-alpha-2' - implementation 'io.quarkus:quarkus-kubernetes-client' + implementation('io.fabric8:kubernetes-client:5.4.1'){force = true} + implementation('io.fabric8:kubernetes-model-core:5.4.1'){force = true} + implementation('io.fabric8:kubernetes-model-common:5.4.1'){force = true} implementation 'org.apache.kafka:kafka-clients:2.7.0' implementation 'khttp:khttp:1.0.0' + compile 'junit:junit:4.12' testImplementation 'io.quarkus:quarkus-junit5' testImplementation 'io.rest-assured:rest-assured' testImplementation 'org.junit-pioneer:junit-pioneer:1.4.0' - testImplementation (group: 'io.fabric8', name: 'kubernetes-server-mock', version: '5.4.1'){force = true} + testImplementation ('io.fabric8:kubernetes-server-mock:5.4.1'){force = true} } group 'theodolite' diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt index 75cbcad051e2055f25d876e66e0fffcdc249c4f5..a7a40cd569f8034f3b8e062dad3031d5643a12e3 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/AbstractStateHandler.kt @@ -1,38 +1,35 @@ package theodolite.execution.operator +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.KubernetesResourceList import io.fabric8.kubernetes.api.model.Namespaced import io.fabric8.kubernetes.client.CustomResource -import io.fabric8.kubernetes.client.CustomResourceDoneable -import io.fabric8.kubernetes.client.CustomResourceList import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.Resource -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import java.lang.Thread.sleep abstract class AbstractStateHandler<T,L,D>( - private val context: CustomResourceDefinitionContext, private val client: KubernetesClient, private val crd: Class<T>, - private val crdList: Class<L>, - private val donableCRD: Class<D> - ): StateHandler where T : CustomResource, T: Namespaced, L: CustomResourceList<T>, D: CustomResourceDoneable<T> { + private val crdList: Class<L> + ): StateHandler<T> where T : CustomResource<*, *>?, T: HasMetadata, T: Namespaced, L: KubernetesResourceList<T> { - private val crdClient: MixedOperation<T, L, D, Resource<T, D>> = - this.client.customResources(this.context, this.crd, this.crdList, this.donableCRD) + private val crdClient: MixedOperation<T, L,Resource<T>> = + this.client.customResources(this.crd, this.crdList) @Synchronized - override fun setState(resourceName: String, f: (CustomResource) -> CustomResource?) { + override fun setState(resourceName: String, f: (T) -> T?) { this.crdClient .inNamespace(this.client.namespace) .list().items .filter { item -> item.metadata.name == resourceName } .map { customResource -> f(customResource) } - .forEach { this.crdClient.updateStatus(it as T) } + .forEach { this.crdClient.updateStatus(it) } } @Synchronized - override fun getState(resourceName: String, f: (CustomResource) -> String?): String? { + override fun getState(resourceName: String, f: (T) -> String?): String? { return this.crdClient .inNamespace(this.client.namespace) .list().items @@ -42,7 +39,7 @@ abstract class AbstractStateHandler<T,L,D>( } @Synchronized - override fun blockUntilStateIsSet(resourceName: String, desiredStatusString: String, f: (CustomResource) -> String?, maxTries: Int): Boolean { + override fun blockUntilStateIsSet(resourceName: String, desiredStatusString: String, f: (T) -> String?, maxTries: Int): Boolean { for (i in 0.rangeTo(maxTries)) { val currentStatus = getState(resourceName, f) if(currentStatus == desiredStatusString) { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt index 6c8c48f791543b6d8a7716cf26a80bdb449ee7a7..8fc951d09598187bcaf4cb7e4a39d322be722792 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ClusterSetup.kt @@ -12,8 +12,8 @@ import theodolite.model.crd.* private val logger = KotlinLogging.logger {} class ClusterSetup( - private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>, - private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>, + private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, + private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val client: NamespacedKubernetesClient ) { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt index a1617b4988d500baab7b02bf5fa993f7a4ae76a3..4168bd19b57216722ca5301d42ce5e0df3f6c192 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt @@ -33,7 +33,7 @@ class ExecutionHandler( logger.info { "Add execution ${execution.metadata.name}" } execution.spec.name = execution.metadata.name when (this.stateHandler.getExecutionState(execution.metadata.name)) { - null -> this.stateHandler.setExecutionState(execution.spec.name, States.PENDING) + States.NO_STATE -> this.stateHandler.setExecutionState(execution.spec.name, States.PENDING) States.RUNNING -> { this.stateHandler.setExecutionState(execution.spec.name, States.RESTART) if(this.controller.isExecutionRunning(execution.spec.name)){ diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionStateHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionStateHandler.kt index 0367fa40d45ae9e357c43856dc05d19740bd94b9..df5e77695d1beb562408f1b5830f6f4353543c75 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionStateHandler.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionStateHandler.kt @@ -1,55 +1,54 @@ package theodolite.execution.operator -import io.fabric8.kubernetes.client.CustomResource import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import theodolite.model.crd.BenchmarkExecutionList import theodolite.model.crd.ExecutionCRD +import theodolite.model.crd.ExecutionStatus import theodolite.model.crd.States import java.lang.Thread.sleep import java.time.Duration import java.time.Instant import java.util.concurrent.atomic.AtomicBoolean -class ExecutionStateHandler(context: CustomResourceDefinitionContext, val client: KubernetesClient): - AbstractStateHandler<ExecutionCRD, BenchmarkExecutionList, DoneableExecution>( - context = context, +class ExecutionStateHandler(val client: KubernetesClient): + AbstractStateHandler<ExecutionCRD, BenchmarkExecutionList, ExecutionStatus >( client = client, crd = ExecutionCRD::class.java, - crdList = BenchmarkExecutionList::class.java, - donableCRD = DoneableExecution::class.java) { + crdList = BenchmarkExecutionList::class.java + ) { private var runExecutionDurationTimer: AtomicBoolean = AtomicBoolean(false) - private fun getExecutionLambda() = { cr: CustomResource -> - var execState = "" - if (cr is ExecutionCRD) { execState = cr.status.executionState } - execState - } + private fun getExecutionLambda() = { cr: ExecutionCRD -> cr.status.executionState } - private fun getDurationLambda() = { cr: CustomResource -> - var execState = "" - if (cr is ExecutionCRD) { execState = cr.status.executionDuration } - execState - } + private fun getDurationLambda() = { cr: ExecutionCRD -> cr.status.executionDuration } fun setExecutionState(resourceName: String, status: States): Boolean { - setState(resourceName) {cr -> if(cr is ExecutionCRD) cr.status.executionState = status.value; cr} + setState(resourceName) {cr -> cr.status.executionState = status.value; cr} return blockUntilStateIsSet(resourceName, status.value, getExecutionLambda()) } - fun getExecutionState(resourceName: String) : States? { + fun getExecutionState(resourceName: String) : States { val status = this.getState(resourceName, getExecutionLambda()) - return States.values().firstOrNull { it.value == status } + return if(status.isNullOrBlank()){ + States.NO_STATE + } else { + States.values().first { it.value == status } + } } fun setDurationState(resourceName: String, duration: Duration): Boolean { - setState(resourceName) { cr -> if (cr is ExecutionCRD) cr.status.executionDuration = durationToK8sString(duration); cr } + setState(resourceName) { cr -> cr.status.executionDuration = durationToK8sString(duration); cr } return blockUntilStateIsSet(resourceName, durationToK8sString(duration), getDurationLambda()) } - fun getDurationState(resourceName: String): String? { - return this.getState(resourceName, getDurationLambda()) + fun getDurationState(resourceName: String): String { + val status = getState(resourceName, getDurationLambda()) + return if (status.isNullOrBlank()) { + "-" + } else { + status + } } private fun durationToK8sString(duration: Duration): String { diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/StateHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/StateHandler.kt index 0fbd97e5cca4a9be220eb0b0c89ea0af129a7860..cefcf2ec97986375205205fd95ddcd2ff7eacf5a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/StateHandler.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/StateHandler.kt @@ -1,15 +1,14 @@ package theodolite.execution.operator -import io.fabric8.kubernetes.client.CustomResource private const val MAX_TRIES: Int = 5 -interface StateHandler { - fun setState(resourceName: String, f: (CustomResource) -> CustomResource?) - fun getState(resourceName: String, f: (CustomResource) -> String?): String? +interface StateHandler<T> { + fun setState(resourceName: String, f: (T) -> T?) + fun getState(resourceName: String, f: (T) -> String?): String? fun blockUntilStateIsSet( resourceName: String, desiredStatusString: String, - f: (CustomResource) -> String?, + f: (T) -> String?, maxTries: Int = MAX_TRIES): Boolean } \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt index 3053c364e2d6d9bc9797c190f0a87d861089b556..6bcff9aeed2e756f115f27fbf25cc2aa35230d2d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -1,6 +1,5 @@ package theodolite.execution.operator -import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.Resource import mu.KotlinLogging @@ -17,18 +16,15 @@ private val logger = KotlinLogging.logger {} /** * The controller implementation for Theodolite. * - * @see NamespacedKubernetesClient - * @see CustomResourceDefinitionContext * @see BenchmarkExecution * @see KubernetesBenchmark * @see ConcurrentLinkedDeque */ class TheodoliteController( - private val namespace: String, val path: String, - private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>, - private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>, + private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, + private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val executionStateHandler: ExecutionStateHandler ) { lateinit var executor: TheodoliteExecutor @@ -116,11 +112,10 @@ class TheodoliteController( */ private fun getBenchmarks(): List<KubernetesBenchmark> { return this.benchmarkCRDClient - .inNamespace(namespace) .list() .items .map { it.spec.name = it.metadata.name; it } - .map { it.spec.path = path; it } + .map { it.spec.path = path; it } // TODO check if we can remove the path field from the KubernetesBenchmark .map { it.spec } } @@ -140,7 +135,6 @@ class TheodoliteController( .map { it.name } return executionCRDClient - .inNamespace(namespace) .list() .items .asSequence() @@ -168,6 +162,7 @@ class TheodoliteController( } fun isExecutionRunning(executionName: String): Boolean { + if (!::executor.isInitialized) return false return this.executor.getExecution().name == executionName } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt index 0d55b0c1c1c76dba226d34554e0d96a3df77c1c3..0415680f5947b3a7d759ad0dbf67cd27c2c58d24 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -4,18 +4,18 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.Resource +import io.fabric8.kubernetes.client.informers.SharedInformerFactory import io.fabric8.kubernetes.internal.KubernetesDeserializer import mu.KotlinLogging -import theodolite.k8s.K8sContextFactory -import theodolite.model.crd.* +import theodolite.model.crd.BenchmarkCRD +import theodolite.model.crd.BenchmarkExecutionList +import theodolite.model.crd.ExecutionCRD +import theodolite.model.crd.KubernetesBenchmarkList private const val DEFAULT_NAMESPACE = "default" -private const val SCOPE = "Namespaced" private const val EXECUTION_SINGULAR = "execution" -private const val EXECUTION_PLURAL = "executions" private const val BENCHMARK_SINGULAR = "benchmark" -private const val BENCHMARK_PLURAL = "benchmarks" private const val API_VERSION = "v1" private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong() private const val GROUP = "theodolite.com" @@ -28,7 +28,11 @@ private val logger = KotlinLogging.logger {} */ class TheodoliteOperator { private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE - val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) + private val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" + + private val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) + private lateinit var controller: TheodoliteController + private lateinit var executionStateHandler: ExecutionStateHandler fun start() { @@ -42,7 +46,7 @@ class TheodoliteOperator { /** * Start the operator. */ - private fun startOperator() { + private fun startOperator() { logger.info { "Using $namespace as namespace." } client.use { KubernetesDeserializer.registerCustomKind( @@ -57,66 +61,75 @@ class TheodoliteOperator { BenchmarkCRD::class.java ) - val contextFactory = K8sContextFactory() - val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL) - val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL) - - val executionCRDClient: MixedOperation< - ExecutionCRD, - BenchmarkExecutionList, - DoneableExecution, - Resource<ExecutionCRD, DoneableExecution>> - = client.customResources( - executionContext, - ExecutionCRD::class.java, - BenchmarkExecutionList::class.java, - DoneableExecution::class.java) - - val benchmarkCRDClient: MixedOperation< - BenchmarkCRD, - KubernetesBenchmarkList, - DoneableBenchmark, - Resource<BenchmarkCRD, DoneableBenchmark>> - = client.customResources( - benchmarkContext, - BenchmarkCRD::class.java, - KubernetesBenchmarkList::class.java, - DoneableBenchmark::class.java) - - val executionStateHandler = ExecutionStateHandler( - context = executionContext, - client = client) - - val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" - val controller = - TheodoliteController( - namespace = client.namespace, - path = appResource, - benchmarkCRDClient = benchmarkCRDClient, - executionCRDClient = executionCRDClient, - executionStateHandler = executionStateHandler) - - val informerFactory = client.informers() - val informerExecution = informerFactory.sharedIndexInformerForCustomResource( - executionContext, - ExecutionCRD::class.java, - BenchmarkExecutionList::class.java, - RESYNC_PERIOD - ) - - informerExecution.addEventHandler(ExecutionHandler( - controller = controller, - stateHandler = executionStateHandler)) - ClusterSetup( - executionCRDClient = executionCRDClient, - benchmarkCRDClient = benchmarkCRDClient, + executionCRDClient = getExecutionClient(client), + benchmarkCRDClient = getBenchmarkClient(client), client = client ).clearClusterState() - informerFactory.startAllRegisteredInformers() - controller.run() + getController( + client = client, + executionStateHandler = getExecutionStateHandler(client = client) + ).run() + getExecutionEventHandler(client).startAllRegisteredInformers() + } + } + + fun getExecutionEventHandler(client: NamespacedKubernetesClient): SharedInformerFactory { + val factory = client.informers() + .inNamespace(client.namespace) + factory.sharedIndexInformerForCustomResource( + ExecutionCRD::class.java, + RESYNC_PERIOD + ).addEventHandler( + ExecutionHandler( + controller = controller, + stateHandler = ExecutionStateHandler(client) + ) + ) + return factory + } + + fun getExecutionStateHandler(client: NamespacedKubernetesClient): ExecutionStateHandler { + if (!::executionStateHandler.isInitialized) { + this.executionStateHandler = ExecutionStateHandler(client = client) + } + return executionStateHandler + } + + fun getController( + client: NamespacedKubernetesClient, + executionStateHandler: ExecutionStateHandler + ): TheodoliteController { + if (!::controller.isInitialized) { + this.controller = TheodoliteController( + path = this.appResource, + benchmarkCRDClient = getBenchmarkClient(client), + executionCRDClient = getExecutionClient(client), + executionStateHandler = executionStateHandler + ) } + return this.controller + } + + private fun getExecutionClient(client: NamespacedKubernetesClient): MixedOperation< + ExecutionCRD, + BenchmarkExecutionList, + Resource<ExecutionCRD>> { + return client.customResources( + ExecutionCRD::class.java, + BenchmarkExecutionList::class.java + ) + } + + private fun getBenchmarkClient(client: NamespacedKubernetesClient): MixedOperation< + BenchmarkCRD, + KubernetesBenchmarkList, + Resource<BenchmarkCRD>> { + return client.customResources( + BenchmarkCRD::class.java, + KubernetesBenchmarkList::class.java + ) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt index 9d879ef131d49c8b4491f94dd89dde5437b0bf6e..31a95be04e3290e0797dca5c588394ea36279b0c 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/CustomResourceWrapper.kt @@ -1,12 +1,13 @@ package theodolite.k8s -import io.fabric8.kubernetes.client.CustomResource +import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import mu.KotlinLogging private val logger = KotlinLogging.logger {} -class CustomResourceWrapper(val crAsMap: Map<String, String>, private val context: CustomResourceDefinitionContext) : CustomResource() { + +class CustomResourceWrapper(val crAsMap: Map<String, String>, private val context: CustomResourceDefinitionContext) : KubernetesResource { /** * Deploy a service monitor * @@ -15,7 +16,6 @@ class CustomResourceWrapper(val crAsMap: Map<String, String>, private val contex * @throws java.io.IOException if the resource could not be deployed. */ fun deploy(client: NamespacedKubernetesClient) { - client.customResource(this.context) .createOrReplace(client.configuration.namespace, this.crAsMap as Map<String, Any>) } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/BenchmarkCRD.kt b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/BenchmarkCRD.kt index 326aa10a21bebd913eaafcb8315188288ae97ff1..377708af7b7e1a50ae1e33064b2668c364e0685a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/BenchmarkCRD.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/BenchmarkCRD.kt @@ -1,11 +1,18 @@ package theodolite.model.crd import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.Namespaced import io.fabric8.kubernetes.client.CustomResource +import io.fabric8.kubernetes.model.annotation.Group +import io.fabric8.kubernetes.model.annotation.Kind +import io.fabric8.kubernetes.model.annotation.Version import theodolite.benchmark.KubernetesBenchmark @JsonDeserialize +@Version("v1") +@Group("theodolite.com") +@Kind("benchmark") class BenchmarkCRD( var spec: KubernetesBenchmark = KubernetesBenchmark() -) : CustomResource(), Namespaced \ No newline at end of file +) : CustomResource<KubernetesBenchmark, Void>(), Namespaced, HasMetadata \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableBenchmark.kt deleted file mode 100644 index e00e8268b2ec8eba17b3706feb3940eded1b2b0c..0000000000000000000000000000000000000000 --- a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableBenchmark.kt +++ /dev/null @@ -1,7 +0,0 @@ -package theodolite.model.crd - -import io.fabric8.kubernetes.api.builder.Function -import io.fabric8.kubernetes.client.CustomResourceDoneable - -class DoneableBenchmark(resource: BenchmarkCRD, function: Function<BenchmarkCRD, BenchmarkCRD>) : - CustomResourceDoneable<BenchmarkCRD>(resource, function) \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableExecution.kt b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableExecution.kt deleted file mode 100644 index be07725b405c29a0d9000b6e6ec455536ad111fb..0000000000000000000000000000000000000000 --- a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/DoneableExecution.kt +++ /dev/null @@ -1,8 +0,0 @@ -package theodolite.execution.operator - -import io.fabric8.kubernetes.client.CustomResourceDoneable -import io.fabric8.kubernetes.api.builder.Function -import theodolite.model.crd.ExecutionCRD - -class DoneableExecution(resource: ExecutionCRD, function: Function<ExecutionCRD, ExecutionCRD>) : - CustomResourceDoneable<ExecutionCRD>(resource, function) \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionCRD.kt b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionCRD.kt index 79a387cee250d3abf0fdb576a5f0f33424596792..659621e8c3b1d5308a10d81240575dd3d432b53f 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionCRD.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionCRD.kt @@ -1,13 +1,18 @@ package theodolite.model.crd import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.Namespaced import io.fabric8.kubernetes.client.CustomResource +import io.fabric8.kubernetes.model.annotation.Group +import io.fabric8.kubernetes.model.annotation.Kind +import io.fabric8.kubernetes.model.annotation.Version import theodolite.benchmark.BenchmarkExecution @JsonDeserialize +@Version("v1") +@Group("theodolite.com") +@Kind("execution") class ExecutionCRD( var spec: BenchmarkExecution = BenchmarkExecution(), var status: ExecutionStatus = ExecutionStatus() - ) : CustomResource(), Namespaced \ No newline at end of file +) : CustomResource<BenchmarkExecution, ExecutionStatus>(), Namespaced diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionStatus.kt b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionStatus.kt index 43e9035b3120eb22304576f2006092eec376b6d2..51b76fcee8fb35c83dca407691833dbb235b29c5 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionStatus.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/model/crd/ExecutionStatus.kt @@ -3,11 +3,9 @@ package theodolite.model.crd import com.fasterxml.jackson.databind.annotation.JsonDeserialize import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.Namespaced -import io.fabric8.kubernetes.client.CustomResource @JsonDeserialize -class ExecutionStatus(): KubernetesResource, CustomResource(), Namespaced { +class ExecutionStatus(): KubernetesResource, Namespaced { var executionState: String = "" var executionDuration: String = "-" - } \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/patcher/LabelPatcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/patcher/LabelPatcher.kt index 5ee5807cd8378c9f2bbd62435203208d61131f15..4fa7fc893cfaf864d935074ff50af8d61f7aac76 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/patcher/LabelPatcher.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/patcher/LabelPatcher.kt @@ -37,7 +37,7 @@ class LabelPatcher(private val k8sResource: KubernetesResource, val variableName } k8sResource.metadata.labels[this.variableName] = labelValue } - is CustomResource -> { + is CustomResource<*,*> -> { if (k8sResource.metadata.labels == null){ k8sResource.metadata.labels = mutableMapOf() } diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerDummy.kt b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerDummy.kt deleted file mode 100644 index 3ecc2ef422e579b616f304ec8c4b19110941dcea..0000000000000000000000000000000000000000 --- a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerDummy.kt +++ /dev/null @@ -1,86 +0,0 @@ -package theodolite.execution.operator - -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.MixedOperation -import io.fabric8.kubernetes.client.dsl.Resource -import io.fabric8.kubernetes.internal.KubernetesDeserializer -import theodolite.k8s.K8sContextFactory -import theodolite.model.crd.* - -private const val SCOPE = "Namespaced" -private const val EXECUTION_SINGULAR = "execution" -private const val EXECUTION_PLURAL = "executions" -private const val BENCHMARK_SINGULAR = "benchmark" -private const val BENCHMARK_PLURAL = "benchmarks" -private const val API_VERSION = "v1" -private const val GROUP = "theodolite.com" - -class ControllerDummy(val client: NamespacedKubernetesClient) { - - private var controller: TheodoliteController - val executionContext = K8sContextFactory() - .create( - API_VERSION, - SCOPE, - GROUP, - EXECUTION_PLURAL - ) - val benchmarkContext = K8sContextFactory() - .create( - API_VERSION, - SCOPE, - GROUP, - BENCHMARK_PLURAL - ) - - val executionStateHandler = ExecutionStateHandler( - context = executionContext, - client = client - ) - - fun getController(): TheodoliteController { - return this.controller - } - - init { - KubernetesDeserializer.registerCustomKind( - "$GROUP/$API_VERSION", - EXECUTION_SINGULAR, - ExecutionCRD::class.java - ) - - KubernetesDeserializer.registerCustomKind( - "$GROUP/$API_VERSION", - BENCHMARK_SINGULAR, - BenchmarkCRD::class.java - ) - - val executionCRDClient: MixedOperation< - ExecutionCRD, - BenchmarkExecutionList, - DoneableExecution, - Resource<ExecutionCRD, DoneableExecution>> = client.customResources( - executionContext, - ExecutionCRD::class.java, - BenchmarkExecutionList::class.java, - DoneableExecution::class.java - ) - - val benchmarkCRDClient = client.customResources( - benchmarkContext, - BenchmarkCRD::class.java, - KubernetesBenchmarkList::class.java, - DoneableBenchmark::class.java - ) - - val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" - this.controller = - TheodoliteController( - namespace = client.namespace, - path = appResource, - benchmarkCRDClient = benchmarkCRDClient, - executionCRDClient = executionCRDClient, - executionStateHandler = executionStateHandler - ) - } -} \ No newline at end of file diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerTest.kt index 9174a4cc78933d4c028b2c2a73e1adb63047868f..7ed868c7adc4afcd7a6a606d22124c92910ecd89 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ControllerTest.kt @@ -8,6 +8,7 @@ import io.quarkus.test.junit.QuarkusTest import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark @@ -30,7 +31,10 @@ class ControllerTest { @BeforeEach fun setUp() { server.before() - this.controller = ControllerDummy(server.client).getController() + this.controller = TheodoliteOperator().getController( + client = server.client, + executionStateHandler = ExecutionStateHandler(server.client) + ) // benchmark val benchmark1 = BenchmarkCRDummy(name = "Test-Benchmark") @@ -66,6 +70,38 @@ class ControllerTest { server.after() } + @Test + @DisplayName("Check namespaced property of benchmarkCRDClient") + fun testBenchmarkClientNamespaced(){ + val method = controller + .javaClass + .getDeclaredMethod("getBenchmarks") + method.isAccessible = true + method.invoke(controller) + + assert(server + .lastRequest + .toString() + .contains("namespaces") + ) + } + + @Test + @DisplayName("Check namespaced property of executionCRDClient") + fun testExecutionClientNamespaced(){ + val method = controller + .javaClass + .getDeclaredMethod("getNextExecution") + method.isAccessible = true + method.invoke(controller) + + assert(server + .lastRequest + .toString() + .contains("namespaces") + ) + } + @Test fun getBenchmarksTest() { val method = controller diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ExecutionEventHandlerTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ExecutionEventHandlerTest.kt index 9c3468eaa4df7573a1d909222d6faa7449fd7251..d72dd1cd3e70d94dbd49efb866b8ae6334ab0a4c 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ExecutionEventHandlerTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/ExecutionEventHandlerTest.kt @@ -11,8 +11,6 @@ import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import theodolite.k8s.K8sManager import theodolite.k8s.K8sResourceLoader -import theodolite.model.crd.BenchmarkExecutionList -import theodolite.model.crd.ExecutionCRD import theodolite.model.crd.States import java.lang.Thread.sleep @@ -30,35 +28,27 @@ class ExecutionEventHandlerTest { lateinit var executionVersion2: KubernetesResource lateinit var stateHandler: ExecutionStateHandler lateinit var manager: K8sManager + lateinit var controller: TheodoliteController @BeforeEach fun setUp() { server.before() - val controllerDummy = ControllerDummy(server.client) - - this.factory = server.client.informers() - val informerExecution = factory - .sharedIndexInformerForCustomResource( - controllerDummy.executionContext, - ExecutionCRD::class.java, - BenchmarkExecutionList::class.java, - RESYNC_PERIOD - ) - - informerExecution.addEventHandler( - ExecutionHandler( - controller = controllerDummy.getController(), - stateHandler = controllerDummy.executionStateHandler - ) + val operator = TheodoliteOperator() + this.controller = operator.getController( + client = server.client, + executionStateHandler = ExecutionStateHandler(client = server.client) ) + this.factory = operator.getExecutionEventHandler(server.client) + this.stateHandler = TheodoliteOperator().getExecutionStateHandler(client = server.client) + this.executionVersion1 = K8sResourceLoader(server.client) .loadK8sResource("Execution", testResourcePath + "test-execution.yaml") this.executionVersion2 = K8sResourceLoader(server.client) .loadK8sResource("Execution", testResourcePath + "test-execution-update.yaml") - this.stateHandler = ControllerDummy(server.client).executionStateHandler + this.stateHandler = operator.getExecutionStateHandler(server.client) this.manager = K8sManager((server.client)) } @@ -69,6 +59,20 @@ class ExecutionEventHandlerTest { factory.stopAllRegisteredInformers() } + @Test + @DisplayName("Check namespaced property of informers") + fun testNamespaced() { + manager.deploy(executionVersion1) + factory.startAllRegisteredInformers() + server.lastRequest + // the second request must be namespaced (this is the first `GET` request) + assert(server + .lastRequest + .toString() + .contains("namespaces") + ) + } + @Test @DisplayName("Test onAdd method for executions without execution state") fun testWithoutState() { @@ -124,7 +128,6 @@ class ExecutionEventHandlerTest { resourceName = executionName ) ) - } @Test diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt index 1b92b1e2938df49be2a3a9950d8eb911a7963a40..de74cf9ac87a8aca7db133a04ef3809c5e5087c2 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/execution/operator/StateHandlerTest.kt @@ -7,7 +7,6 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import theodolite.k8s.K8sContextFactory import theodolite.k8s.K8sManager import theodolite.k8s.K8sResourceLoader import theodolite.model.crd.States @@ -16,13 +15,6 @@ import java.time.Duration class StateHandlerTest { private val testResourcePath = "./src/test/resources/k8s-resource-files/" private val server = KubernetesServer(false, true) - private val context = K8sContextFactory().create( - api = "v1", - scope = "Namespaced", - group = "theodolite.com", - plural = "executions" - ) - @BeforeEach fun setUp() { @@ -38,10 +30,36 @@ class StateHandlerTest { server.after() } + @Test + @DisplayName("check if Statehandler is namespaced") + fun namespacedTest() { + val handler = ExecutionStateHandler(client = server.client) + handler.getExecutionState("example-execution") + assert(server + .lastRequest + .toString() + .contains("namespaces") + ) + } + + @Test + @DisplayName("Test empty execution state") + fun executionWithoutExecutionStatusTest(){ + val handler = ExecutionStateHandler(client = server.client) + assertEquals(States.NO_STATE, handler.getExecutionState("example-execution")) + } + + @Test + @DisplayName("Test empty duration state") + fun executionWithoutDurationStatusTest(){ + val handler = ExecutionStateHandler(client = server.client) + assertEquals("-", handler.getDurationState("example-execution")) + } + @Test @DisplayName("Test set and get of the execution state") fun executionStatusTest() { - val handler = ExecutionStateHandler(client = server.client, context = context) + val handler = ExecutionStateHandler(client = server.client) assertTrue(handler.setExecutionState("example-execution", States.INTERRUPTED)) assertEquals(States.INTERRUPTED, handler.getExecutionState("example-execution")) @@ -50,7 +68,7 @@ class StateHandlerTest { @Test @DisplayName("Test set and get of the duration state") fun durationStatusTest() { - val handler = ExecutionStateHandler(client = server.client, context = context) + val handler = ExecutionStateHandler(client = server.client) assertTrue(handler.setDurationState("example-execution", Duration.ofMillis(100))) assertEquals("0s", handler.getDurationState("example-execution")) diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt index 931457b0fc65984c35516dca57dbe52e94184064..dc2bf016994d79b1021bebdc751102e291d60682 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sManagerTest.kt @@ -34,7 +34,7 @@ class K8sManagerTest { .withMetadata(metadata) .withNewSpec() .editOrNewSelector() - .withMatchLabels(mapOf("app" to "test")) + .withMatchLabels<String, String>(mapOf("app" to "test")) .endSelector() .endSpec() .build() @@ -43,7 +43,7 @@ class K8sManagerTest { .withMetadata(metadata) .withNewSpec() .editOrNewSelector() - .withMatchLabels(mapOf("app" to "test")) + .withMatchLabels<String, String>(mapOf("app" to "test")) .endSelector() .endSpec() .build() diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt index 5cb6f0c4cf09daeaa95a805ae965f5b592fd9647..7c2aa50007274ff9b4d49f1c0cc05ae45a37d323 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/k8s/K8sResourceLoaderTest.kt @@ -82,7 +82,7 @@ class K8sResourceLoaderTest { } @Test - @DisplayName("Test loading of ServiceMonitors") + @DisplayName("Test loading of Executions") fun loadExecutionTest() { val loader = K8sResourceLoader(server.client) val resource = loader.loadK8sResource("Execution", testResourcePath + "test-execution.yaml") @@ -95,7 +95,7 @@ class K8sResourceLoaderTest { } @Test - @DisplayName("Test loading of ServiceMonitors") + @DisplayName("Test loading of Benchmarks") fun loadBenchmarkTest() { val loader = K8sResourceLoader(server.client) val resource = loader.loadK8sResource("Benchmark", testResourcePath + "test-benchmark.yaml")