diff --git a/execution/run_uc.py b/execution/run_uc.py index 7e4de01b6ed6e45471a230c62fba3cb07bb8e5aa..00687af55e3fa9edba26de7be6afcde3a619adf8 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -1,6 +1,7 @@ +import argparse # parse arguments from cli from kubernetes import client, config # kubernetes api from kubernetes.stream import stream -import argparse # parse arguments from cli +import logging # logging from os import path # path utilities import subprocess # execute bash commands import time # process sleep @@ -13,6 +14,7 @@ args = None # CLI arguments def load_variables(): + """Load the CLI variables given at the command line""" global args print('Load CLI variables') parser = argparse.ArgumentParser(description='Run use case Programm') @@ -68,6 +70,9 @@ def load_variables(): def initialize_kubernetes_api(): + """Load the kubernetes config from local or the cluster and creates + needed APIs. + """ global coreApi, appsApi, customApi print('Connect to kubernetes api') try: @@ -82,6 +87,9 @@ def initialize_kubernetes_api(): def create_topics(topics): + """Create the topics needed for the use cases + :param topics: List of topics that should be created. + """ # Calling exec and waiting for response print('Create topics') for (topic, partitions) in topics: @@ -103,97 +111,133 @@ def create_topics(topics): print(resp) -def start_workload_generator(): +def load_yaml(file_path): + """Creates a yaml file from the file at given path. + :param file_path: The path to the file which contains the yaml. + :return: The file as a yaml object. + """ + try: + f = open(path.join(path.dirname(__file__), file_path)) + with f: + return yaml.safe_load(f) + except: + print('Error opening file %s' % file_path) + + +def load_yaml_files(): + """Load the needed yaml files and creates objects from them. + :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy + """ + print('Load kubernetes yaml files') + wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml') + app_svc = load_yaml('uc-application/base/aggregation-service.yaml') + app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml') + app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml') + app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml') + + print('Kubernetes yaml files loaded') + return wg, app_svc, app_svc_monitor ,app_jmx, app_deploy + + +def start_workload_generator(wg_yaml): + """Starts the workload generator. + :param wg_yaml: The yaml object for the workload generator. + :return: The StatefulSet created by the API + """ print('Start workload generator') num_sensors = args.dim_value wl_max_records = 150000 # TODO: How is this calculation done? wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) + # Customize workload generator creations + wg_yaml['spec']['replicas'] = wl_instances + # TODO: acces over name of container + wg_containter = wg_yaml['spec']['template']['spec']['containers'][0] + wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest' + # TODO: acces over name of attribute + wg_containter['env'][1]['value'] = str(num_sensors) + wg_containter['env'][2]['value'] = str(wl_instances) - # Create statefull set - with open(path.join(path.dirname(__file__), "uc-workload-generator/base/workloadGenerator.yaml")) as f: - dep = yaml.safe_load(f) - dep['spec']['replicas'] = wl_instances - # TODO: acces over name of container - wg_containter = dep['spec']['template']['spec']['containers'][0] - wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest' - # TODO: acces over name of attribute - wg_containter['env'][1]['value'] = str(num_sensors) - wg_containter['env'][2]['value'] = str(wl_instances) - print(dep) - try: - resp = appsApi.create_namespaced_stateful_set( - namespace="default", - body=dep - ) - print("StatefulSet '%s' created." % resp.metadata.name) - except client.rest.ApiException as e: - print("StatefulSet creation error: %s" % e.reason) + try: + wg_ss = appsApi.create_namespaced_stateful_set( + namespace="default", + body=wg_yaml + ) + print("StatefulSet '%s' created." % wg_ss.metadata.name) + return wg_ss + except client.rest.ApiException as e: + print("StatefulSet creation error: %s" % e.reason) return -def start_application(): - print('Start use case applications') +def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): + """Applies the service, service monitor, jmx config map and start the + use case application. - # Apply aggregation service - with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-service.yaml")) as f: - dep = yaml.safe_load(f) - try: - resp = coreApi.create_namespaced_service( - namespace="default", body=dep) - print("Service '%s' created." % resp.metadata.name) - except: - print("Service creation error.") + :param svc_yaml: The yaml object for the service. + :param svc_monitor_yaml: The yaml object for the service monitor. + :param jmx_yaml: The yaml object for the jmx config map. + :param deploy_yaml: The yaml object for the application. + :return: + The Service, ServiceMonitor, JMX ConfigMap and Deployment. + return svc, svc_monitor, jmx_cm, app_deploy + """ + print('Start use case application') + svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None - # Apply jmx config map for aggregation service - with open(path.join(path.dirname(__file__), "uc-application/base/jmx-configmap.yaml")) as f: - dep = yaml.safe_load(f) - try: - resp = coreApi.create_namespaced_config_map( - namespace="default", body=dep) - print("ConfigMap '%s' created." % resp.metadata.name) - except: - print("ConfigMap creation error.") + # Create Service + try: + svc = coreApi.create_namespaced_service( + namespace="default", body=svc_yaml) + print("Service '%s' created." % svc.metadata.name) + except: + print("Service creation error.") # Create custom object service monitor - with open(path.join(path.dirname(__file__), "uc-application/base/service-monitor.yaml")) as f: - dep = yaml.safe_load(f) - try: - resp = customApi.create_namespaced_custom_object( - group="monitoring.coreos.com", - version="v1", - namespace="default", - plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor - body=dep, - ) - print("ServiceMonitor '%s' created." % resp['metadata']['name']) - except: - print("ServiceMonitor creation error") + try: + svc_monitor = customApi.create_namespaced_custom_object( + group="monitoring.coreos.com", + version="v1", + namespace="default", + plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor + body=svc_monitor_yaml, + ) + print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name']) + except: + print("ServiceMonitor creation error") + + # Apply jmx config map for aggregation service + try: + jmx_cm = coreApi.create_namespaced_config_map( + namespace="default", body=jmx_yaml) + print("ConfigMap '%s' created." % jmx_cm.metadata.name) + except: + print("ConfigMap creation error.") # Create deployment - with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-deployment.yaml")) as f: - dep = yaml.safe_load(f) - dep['spec']['replicas'] = args.instances - # TODO: acces over name of container - app_container = dep['spec']['template']['spec']['containers'][0] - app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest' - # TODO: acces over name of attribute - app_container['env'][1]['value'] = str(args.commit_interval_ms) - app_container['resources']['limits']['memory'] = args.memory_limit - app_container['resources']['limits']['cpu'] = args.cpu_limit - try: - resp = appsApi.create_namespaced_deployment( - namespace="default", - body=dep - ) - print("Deployment '%s' created." % resp.metadata.name) - except client.rest.ApiException as e: - print("Deployment creation error: %s" % e.reason) - return + deploy_yaml['spec']['replicas'] = args.instances + # TODO: acces over name of container + app_container = deploy_yaml['spec']['template']['spec']['containers'][0] + app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest' + # TODO: acces over name of attribute + app_container['env'][1]['value'] = str(args.commit_interval_ms) + app_container['resources']['limits']['memory'] = args.memory_limit + app_container['resources']['limits']['cpu'] = args.cpu_limit + try: + resp = appsApi.create_namespaced_deployment( + namespace="default", + body=deploy_yaml + ) + print("Deployment '%s' created." % resp.metadata.name) + except client.rest.ApiException as e: + print("Deployment creation error: %s" % e.reason) + + return svc, svc_monitor, jmx_cm, app_deploy def wait_execution(): + """Wait time while in execution.""" print('Wait while executing') # TODO: ask which fits better # time.sleep(args.execution_minutes * 60) @@ -205,6 +249,7 @@ def wait_execution(): def run_evaluation_script(): + """Runs the evaluation script.""" # TODO: implement # # Run eval script # source ../.venv/bin/activate @@ -214,7 +259,14 @@ def run_evaluation_script(): return -def stop_applications(): +def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): + """Stops the applied applications and delete resources. + :param wg: The workload generator statefull set. + :param app_svc: The application service. + :param app_svc_monitor: The application service monitor. + :param app_jmx: The application jmx config map. + :param app_deploy: The application deployment. + """ print('Stop use case application and workload generator') exec_command = [ 'kubectl', @@ -232,6 +284,9 @@ def stop_applications(): def delete_topics(topics): + """Delete topics from Kafka. + :param topics: List of topics to delete. + """ print('Delete topics from Kafka') num_topics_command = [ @@ -271,9 +326,11 @@ def delete_topics(topics): return -# Stop the lag exporter in order to reset it and allow smooth execution for -# next use cases def stop_lag_exporter(): + """ + Stop the lag exporter in order to reset it and allow smooth execution for + next use cases. + """ print('Stop the lag exporter') find_pod_command = [ @@ -301,18 +358,18 @@ def stop_lag_exporter(): def main(): load_variables() print('---------------------') - + initialize_kubernetes_api() + print('---------------------') topics = [('input', args.partitions), ('output', args.partitions), ('configuration', 1)] - - initialize_kubernetes_api() - print('---------------------') create_topics(topics) print('---------------------') - start_workload_generator() + wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() + print('---------------------') + wg = start_workload_generator(wg) print('---------------------') - start_application() + app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(app_svc, app_svc_monitor, app_jmx, app_deploy) print('---------------------') wait_execution() print('---------------------') @@ -324,4 +381,5 @@ def main(): if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) main()