From 1cb280a87d51d62d9580a9be3ead3d62c2231f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Wed, 23 Sep 2020 10:00:13 +0200 Subject: [PATCH] Make the run uc py functions parameterized and not use global var Make the functions parameterized in order to call the main method from another python class. Before the methods used a global variable to access the values. --- execution/run_uc.py | 125 +++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 41 deletions(-) diff --git a/execution/run_uc.py b/execution/run_uc.py index 3c490afcd..dac9ff8e8 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -1,19 +1,18 @@ import argparse # parse arguments from cli -import atexit # used to clear resources at exit of program (e.g. ctrl-c) +import atexit # used to clear resources at exit of program (e.g. ctrl-c) from kubernetes import client, config # kubernetes api from kubernetes.stream import stream import lag_analysis import logging # logging from os import path # path utilities import subprocess # execute bash commands -import sys # for exit of program +import sys # for exit of program import time # process sleep import yaml # convert from file to yaml object coreApi = None # acces kubernetes core api appsApi = None # acces kubernetes apps api customApi = None # acces kubernetes custom object api -args = None # CLI arguments def load_variables(): @@ -83,6 +82,7 @@ def load_variables(): args = parser.parse_args() print(args) + return args def initialize_kubernetes_api(): @@ -159,23 +159,25 @@ def load_yaml_files(): return wg, app_svc, app_svc_monitor, app_jmx, app_deploy -def start_workload_generator(wg_yaml): +def start_workload_generator(wg_yaml, dim_value, uc_id): """Starts the workload generator. :param wg_yaml: The yaml object for the workload generator. + :param string dim_value: The dimension value the load generator should use. + :param string uc_id: Use case id for which load should be generated. :return: The StatefulSet created by the API or in case it already exist/error the yaml object. """ print('Start workload generator') - num_sensors = args.dim_value + num_sensors = dim_value wl_max_records = 150000 wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) # set parameters special for uc 2 - if args.uc_id == '2': + if uc_id == '2': print('use uc2 stuff') - num_nested_groups = args.dim_value + num_nested_groups = dim_value num_sensors = '4' approx_num_sensors = int(num_sensors) ** num_nested_groups wl_instances = int( @@ -187,13 +189,13 @@ def start_workload_generator(wg_yaml): # TODO: acces over name of container # Set used use case wg_containter = wg_yaml['spec']['template']['spec']['containers'][0] - wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \ + wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \ '-workload-generator:latest' # TODO: acces over name of attribute # Set environment variables wg_containter['env'][0]['value'] = str(num_sensors) wg_containter['env'][1]['value'] = str(wl_instances) - if args.uc_id == '2': # Special configuration for uc2 + if uc_id == '2': # Special configuration for uc2 wg_containter['env'][2]['value'] = str(num_nested_groups) try: @@ -208,7 +210,7 @@ def start_workload_generator(wg_yaml): return wg_yaml -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): +def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit): """Applies the service, service monitor, jmx config map and start the use case application. @@ -216,6 +218,11 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): :param svc_monitor_yaml: The yaml object for the service monitor. :param jmx_yaml: The yaml object for the jmx config map. :param deploy_yaml: The yaml object for the application. + :param int instances: Number of instances for use case application. + :param string uc_id: The id of the use case to execute. + :param int commit_interval_ms: The commit interval in ms. + :param string memory_limit: The memory limit for the application. + :param string cpu_limit: The CPU limit for the application. :return: The Service, ServiceMonitor, JMX ConfigMap and Deployment. In case the resource already exist/error the yaml object is returned. @@ -257,15 +264,15 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): logging.error("ConfigMap creation error: %s" % e.reason) # Create deployment - deploy_yaml['spec']['replicas'] = args.instances + deploy_yaml['spec']['replicas'] = instances # TODO: acces over name of container app_container = deploy_yaml['spec']['template']['spec']['containers'][0] - app_container['image'] = 'theodolite/theodolite-uc' + args.uc_id \ + app_container['image'] = 'theodolite/theodolite-uc' + uc_id \ + '-kstreams-app:latest' # TODO: acces over name of attribute - app_container['env'][0]['value'] = str(args.commit_interval_ms) - app_container['resources']['limits']['memory'] = args.memory_limit - app_container['resources']['limits']['cpu'] = args.cpu_limit + app_container['env'][0]['value'] = str(commit_interval_ms) + app_container['resources']['limits']['memory'] = memory_limit + app_container['resources']['limits']['cpu'] = cpu_limit try: app_deploy = appsApi.create_namespaced_deployment( namespace="default", @@ -279,22 +286,31 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): return svc, svc_monitor, jmx_cm, app_deploy -def wait_execution(): - """Wait time while in execution.""" +def wait_execution(execution_minutes): + """ + Wait time while in execution. + :param int execution_minutes: The duration to wait for execution. + """ print('Wait while executing') - # TODO: ask which fits better - # time.sleep(args.execution_minutes * 60) - for i in range(args.execution_minutes): + + for i in range(execution_minutes): time.sleep(60) print(f"Executed: {i+1} minutes") print('Execution finished') return -def run_evaluation(): - """Runs the evaluation function""" +def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes): + """ + Runs the evaluation function + :param string exp_id: ID of the experiment. + :param string uc_id: ID of the executed use case. + :param int dim_value: The dimension value used for execution. + :param int instances: The number of instances used for the execution. + :param int execution_minutes: How long the use case where executed. + """ print('Run evaluation function') - lag_analysis.main(args.exp_id, f'uc{args.uc_id}', args.dim_value, args.instances, args.execution_minutes) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes) return @@ -309,7 +325,8 @@ def delete_resource(obj, del_func): try: del_func(obj.metadata.name, 'default') except Exception as e: - logging.debug('Error deleting resource with api object, try with dict.') + logging.debug( + 'Error deleting resource with api object, try with dict.') try: del_func(obj['metadata']['name'], 'default') except Exception as e: @@ -444,7 +461,7 @@ def reset_zookeeper(): text=True) logging.debug(output) - if output.returncode == 1: # Means data not available anymore + if output.returncode == 1: # Means data not available anymore print('ZooKeeper reset was successful.') break else: @@ -495,52 +512,73 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): print('---------------------') stop_lag_exporter() -def main(): - load_variables() - print('---------------------') +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only): + """ + Main method to execute one time the benchmark for a given use case. + Start workload generator/application -> execute -> analyse -> stop all + :param string exp_id: The number of executed experiment + :param string uc_id: Use case to execute + :param int dim_value: Dimension value for load generator. + :param int instances: Number of instances for application. + :param int partitions: Number of partitions the kafka topics should have. + :param string cpu_limit: Max CPU utilazation for application. + :param string memory_limit: Max memory utilazation for application. + :param int commit_interval_ms: Kafka Streams commit interval in milliseconds + :param int execution_minutes: How long to execute the benchmark. + :param boolean reset: Flag for reset of cluster before execution. + :param boolean reset_only: Flag to only reset the application. + """ wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() print('---------------------') initialize_kubernetes_api() print('---------------------') - topics = [('input', args.partitions), - ('output', args.partitions), - ('aggregation-feedback', args.partitions), + topics = [('input', partitions), + ('output', partitions), + ('aggregation-feedback', partitions), ('configuration', 1)] # Check for reset options - if args.reset_only: + if reset_only: # Only reset cluster an then end program - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg, app_svc, app_svc_monitor, + app_jmx, app_deploy, topics) sys.exit() - if args.reset: + if reset: # Reset cluster before execution print('Reset only mode') - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg, app_svc, app_svc_monitor, + app_jmx, app_deploy, topics) print('---------------------') # Register the reset operation so that is executed at the end of program - atexit.register(reset_cluster, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + atexit.register(reset_cluster, wg, app_svc, + app_svc_monitor, app_jmx, app_deploy, topics) create_topics(topics) print('---------------------') - wg = start_workload_generator(wg) + wg = start_workload_generator(wg, dim_value, uc_id) print('---------------------') app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( app_svc, app_svc_monitor, app_jmx, - app_deploy) + app_deploy, + instances, + uc_id, + commit_interval_ms, + memory_limit, + cpu_limit) print('---------------------') - wait_execution() + wait_execution(execution_minutes) print('---------------------') - run_evaluation() + run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes) print('---------------------') # Cluster is resetted with atexit method @@ -549,4 +587,9 @@ def main(): if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - main() + args = load_variables() + print('---------------------') + main(args.exp_id, args.uc_id, args.dim_value, args.instances, + args.partitions, args.cpu_limit, args.memory_limit, + args.commit_interval_ms, args.execution_minutes, args.reset, + args.reset_only) -- GitLab