diff --git a/execution/run_uc.py b/execution/run_uc.py index 3c490afcd8307824874cc8eb604d7cdb992dc447..dac9ff8e85a6f8c805ec62d45b8d8aa15fd9a2ed 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -1,19 +1,18 @@ import argparse # parse arguments from cli -import atexit # used to clear resources at exit of program (e.g. ctrl-c) +import atexit # used to clear resources at exit of program (e.g. ctrl-c) from kubernetes import client, config # kubernetes api from kubernetes.stream import stream import lag_analysis import logging # logging from os import path # path utilities import subprocess # execute bash commands -import sys # for exit of program +import sys # for exit of program import time # process sleep import yaml # convert from file to yaml object coreApi = None # acces kubernetes core api appsApi = None # acces kubernetes apps api customApi = None # acces kubernetes custom object api -args = None # CLI arguments def load_variables(): @@ -83,6 +82,7 @@ def load_variables(): args = parser.parse_args() print(args) + return args def initialize_kubernetes_api(): @@ -159,23 +159,25 @@ def load_yaml_files(): return wg, app_svc, app_svc_monitor, app_jmx, app_deploy -def start_workload_generator(wg_yaml): +def start_workload_generator(wg_yaml, dim_value, uc_id): """Starts the workload generator. :param wg_yaml: The yaml object for the workload generator. + :param string dim_value: The dimension value the load generator should use. + :param string uc_id: Use case id for which load should be generated. :return: The StatefulSet created by the API or in case it already exist/error the yaml object. """ print('Start workload generator') - num_sensors = args.dim_value + num_sensors = dim_value wl_max_records = 150000 wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) # set parameters special for uc 2 - if args.uc_id == '2': + if uc_id == '2': print('use uc2 stuff') - num_nested_groups = args.dim_value + num_nested_groups = dim_value num_sensors = '4' approx_num_sensors = int(num_sensors) ** num_nested_groups wl_instances = int( @@ -187,13 +189,13 @@ def start_workload_generator(wg_yaml): # TODO: acces over name of container # Set used use case wg_containter = wg_yaml['spec']['template']['spec']['containers'][0] - wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \ + wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \ '-workload-generator:latest' # TODO: acces over name of attribute # Set environment variables wg_containter['env'][0]['value'] = str(num_sensors) wg_containter['env'][1]['value'] = str(wl_instances) - if args.uc_id == '2': # Special configuration for uc2 + if uc_id == '2': # Special configuration for uc2 wg_containter['env'][2]['value'] = str(num_nested_groups) try: @@ -208,7 +210,7 @@ def start_workload_generator(wg_yaml): return wg_yaml -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): +def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit): """Applies the service, service monitor, jmx config map and start the use case application. @@ -216,6 +218,11 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): :param svc_monitor_yaml: The yaml object for the service monitor. :param jmx_yaml: The yaml object for the jmx config map. :param deploy_yaml: The yaml object for the application. + :param int instances: Number of instances for use case application. + :param string uc_id: The id of the use case to execute. + :param int commit_interval_ms: The commit interval in ms. + :param string memory_limit: The memory limit for the application. + :param string cpu_limit: The CPU limit for the application. :return: The Service, ServiceMonitor, JMX ConfigMap and Deployment. In case the resource already exist/error the yaml object is returned. @@ -257,15 +264,15 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): logging.error("ConfigMap creation error: %s" % e.reason) # Create deployment - deploy_yaml['spec']['replicas'] = args.instances + deploy_yaml['spec']['replicas'] = instances # TODO: acces over name of container app_container = deploy_yaml['spec']['template']['spec']['containers'][0] - app_container['image'] = 'theodolite/theodolite-uc' + args.uc_id \ + app_container['image'] = 'theodolite/theodolite-uc' + uc_id \ + '-kstreams-app:latest' # TODO: acces over name of attribute - app_container['env'][0]['value'] = str(args.commit_interval_ms) - app_container['resources']['limits']['memory'] = args.memory_limit - app_container['resources']['limits']['cpu'] = args.cpu_limit + app_container['env'][0]['value'] = str(commit_interval_ms) + app_container['resources']['limits']['memory'] = memory_limit + app_container['resources']['limits']['cpu'] = cpu_limit try: app_deploy = appsApi.create_namespaced_deployment( namespace="default", @@ -279,22 +286,31 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): return svc, svc_monitor, jmx_cm, app_deploy -def wait_execution(): - """Wait time while in execution.""" +def wait_execution(execution_minutes): + """ + Wait time while in execution. + :param int execution_minutes: The duration to wait for execution. + """ print('Wait while executing') - # TODO: ask which fits better - # time.sleep(args.execution_minutes * 60) - for i in range(args.execution_minutes): + + for i in range(execution_minutes): time.sleep(60) print(f"Executed: {i+1} minutes") print('Execution finished') return -def run_evaluation(): - """Runs the evaluation function""" +def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes): + """ + Runs the evaluation function + :param string exp_id: ID of the experiment. + :param string uc_id: ID of the executed use case. + :param int dim_value: The dimension value used for execution. + :param int instances: The number of instances used for the execution. + :param int execution_minutes: How long the use case where executed. + """ print('Run evaluation function') - lag_analysis.main(args.exp_id, f'uc{args.uc_id}', args.dim_value, args.instances, args.execution_minutes) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) return @@ -309,7 +325,8 @@ def delete_resource(obj, del_func): try: del_func(obj.metadata.name, 'default') except Exception as e: - logging.debug('Error deleting resource with api object, try with dict.') + logging.debug( + 'Error deleting resource with api object, try with dict.') try: del_func(obj['metadata']['name'], 'default') except Exception as e: @@ -444,7 +461,7 @@ def reset_zookeeper(): text=True) logging.debug(output) - if output.returncode == 1: # Means data not available anymore + if output.returncode == 1: # Means data not available anymore print('ZooKeeper reset was successful.') break else: @@ -495,52 +512,73 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): print('---------------------') stop_lag_exporter() -def main(): - load_variables() - print('---------------------') +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only): + """ + Main method to execute one time the benchmark for a given use case. + Start workload generator/application -> execute -> analyse -> stop all + :param string exp_id: The number of executed experiment + :param string uc_id: Use case to execute + :param int dim_value: Dimension value for load generator. + :param int instances: Number of instances for application. + :param int partitions: Number of partitions the kafka topics should have. + :param string cpu_limit: Max CPU utilazation for application. + :param string memory_limit: Max memory utilazation for application. + :param int commit_interval_ms: Kafka Streams commit interval in milliseconds + :param int execution_minutes: How long to execute the benchmark. + :param boolean reset: Flag for reset of cluster before execution. + :param boolean reset_only: Flag to only reset the application. + """ wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() print('---------------------') initialize_kubernetes_api() print('---------------------') - topics = [('input', args.partitions), - ('output', args.partitions), - ('aggregation-feedback', args.partitions), + topics = [('input', partitions), + ('output', partitions), + ('aggregation-feedback', partitions), ('configuration', 1)] # Check for reset options - if args.reset_only: + if reset_only: # Only reset cluster an then end program - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg, app_svc, app_svc_monitor, + app_jmx, app_deploy, topics) sys.exit() - if args.reset: + if reset: # Reset cluster before execution print('Reset only mode') - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg, app_svc, app_svc_monitor, + app_jmx, app_deploy, topics) print('---------------------') # Register the reset operation so that is executed at the end of program - atexit.register(reset_cluster, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + atexit.register(reset_cluster, wg, app_svc, + app_svc_monitor, app_jmx, app_deploy, topics) create_topics(topics) print('---------------------') - wg = start_workload_generator(wg) + wg = start_workload_generator(wg, dim_value, uc_id) print('---------------------') app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( app_svc, app_svc_monitor, app_jmx, - app_deploy) + app_deploy, + instances, + uc_id, + commit_interval_ms, + memory_limit, + cpu_limit) print('---------------------') - wait_execution() + wait_execution(execution_minutes) print('---------------------') - run_evaluation() + run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes) print('---------------------') # Cluster is resetted with atexit method @@ -549,4 +587,9 @@ def main(): if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - main() + args = load_variables() + print('---------------------') + main(args.exp_id, args.uc_id, args.dim_value, args.instances, + args.partitions, args.cpu_limit, args.memory_limit, + args.commit_interval_ms, args.execution_minutes, args.reset, + args.reset_only)