diff --git a/execution/run_uc.py b/execution/run_uc.py index 05e7fb6944f87d7e8845a69d385c411cf14b77cf..da23cb14e54fdb56cfdc5f1eacae7789654a4eaf 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -41,8 +41,8 @@ def initialize_kubernetes_api(): config.load_kube_config() # try using local config except config.config_exception.ConfigException as e: # load config from pod, if local config is not available - logging.debug('Failed loading local Kubernetes configuration,' - + ' try from cluster') + logging.debug( + 'Failed loading local Kubernetes configuration try from cluster') logging.debug(e) config.load_incluster_config() @@ -58,8 +58,7 @@ def create_topics(topics): # Calling exec and waiting for response print('Create topics') for (topic, partitions) in topics: - print('Create topic ' + topic + ' with #' + str(partitions) - + ' partitions') + print(f'Create topic {topic} with #{partitions} partitions') exec_command = [ '/bin/sh', '-c', @@ -86,7 +85,7 @@ def load_yaml(file_path): with f: return yaml.safe_load(f) except Exception as e: - logging.error('Error opening file %s' % file_path) + logging.error('Error opening file %s', file_path) logging.error(e) @@ -105,6 +104,15 @@ def load_yaml_files(): return wg, app_svc, app_svc_monitor, app_jmx, app_deploy +def replace_env_value(container, key, value): + """ + Special method to replace in a container with kubernetes env values + the value of a given parameter. + """ + next(filter(lambda x: x['name'] == key, container))[ + 'value'] = value + + def start_workload_generator(wg_yaml, dim_value, uc_id): """Starts the workload generator. :param wg_yaml: The yaml object for the workload generator. @@ -139,22 +147,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): '-workload-generator:latest' # Set environment variables - next(filter(lambda x: x['name'] == 'NUM_SENSORS', wg_containter['env']))[ - 'value'] = str(num_sensors) - next(filter(lambda x: x['name'] == 'INSTANCES', wg_containter['env']))[ - 'value'] = str(wl_instances) + replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) + replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances)) + if uc_id == '2': # Special configuration for uc2 - next(filter(lambda x: x['name'] == 'NUM_NESTED_GROUPS', wg_containter['env']))[ - 'value'] = str(num_nested_groups) + replace_env_value( + wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) + try: wg_ss = appsApi.create_namespaced_deployment( namespace=namespace, body=wg_yaml ) - print("Deployment '%s' created." % wg_ss.metadata.name) + print(f'Deployment {wg_ss.metadata.name} created.') return wg_ss except client.rest.ApiException as e: - print("Deployment creation error: %s" % e.reason) + print(f'Deployment creation error: {e.reason}') return wg_yaml @@ -185,10 +193,10 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, try: svc = coreApi.create_namespaced_service( namespace=namespace, body=svc_yaml) - print("Service '%s' created." % svc.metadata.name) + print(f'Service {svc.metadata.name} created.') except client.rest.ApiException as e: svc = svc_yaml - logging.error("Service creation error: %s" % e.reason) + logging.error("Service creation error: %s", e.reason) # Create custom object service monitor try: @@ -199,31 +207,39 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, plural="servicemonitors", # CustomResourceDef of ServiceMonitor body=svc_monitor_yaml, ) - print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name']) + print(f"ServiceMonitor '{svc_monitor['metadata']['name']}' created.") except client.rest.ApiException as e: svc_monitor = svc_monitor_yaml - logging.error("ServiceMonitor creation error: %s" % e.reason) + logging.error("ServiceMonitor creation error: %s", e.reason) # Apply jmx config map for aggregation service try: jmx_cm = coreApi.create_namespaced_config_map( namespace=namespace, body=jmx_yaml) - print("ConfigMap '%s' created." % jmx_cm.metadata.name) + print(f"ConfigMap '{jmx_cm.metadata.name}' created.") except client.rest.ApiException as e: jmx_cm = jmx_yaml - logging.error("ConfigMap creation error: %s" % e.reason) + logging.error("ConfigMap creation error: %s", e.reason) # Create deployment deploy_yaml['spec']['replicas'] = instances app_container = next(filter( - lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers'])) + lambda x: x['name'] == 'uc-application', + deploy_yaml['spec']['template']['spec']['containers'])) app_container['image'] = 'theodolite/theodolite-uc' + uc_id \ + '-kstreams-app:latest' # Set configurations environment parameters for SPE - for k,v in configurations.items(): - conf = {'name': k, 'value': v} - app_container['env'].append(conf) + for k, v in configurations.items(): + # check if environment variable is already definde in yaml + env = next(filter(lambda x: x['name'] == k, + app_container['env']), None) + if env is not None: + env['value'] = v # replace value + else: + # create new environment pair + conf = {'name': k, 'value': v} + app_container['env'].append(conf) # Set resources in Kubernetes app_container['resources']['limits']['memory'] = memory_limit @@ -235,10 +251,10 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, namespace=namespace, body=deploy_yaml ) - print("Deployment '%s' created." % app_deploy.metadata.name) + print(f"Deployment '{app_deploy.metadata.name}' created.") except client.rest.ApiException as e: app_deploy = deploy_yaml - logging.error("Deployment creation error: %s" % e.reason) + logging.error("Deployment creation error: %s", e.reason) return svc, svc_monitor, jmx_cm, app_deploy @@ -252,7 +268,7 @@ def wait_execution(execution_minutes): for i in range(execution_minutes): time.sleep(60) - print(f"Executed: {i+1} minutes") + print(f'Executed: {i+1} minutes') print('Execution finished') return @@ -267,7 +283,8 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, + execution_minutes, prometheus_base_url, result_path) return @@ -319,7 +336,7 @@ def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): name=app_svc_monitor['metadata']['name']) print('Resource deleted') except Exception as e: - print("Error deleting service monitor") + print('Error deleting service monitor') print('Delete jmx config map') delete_resource(app_jmx, coreApi.delete_namespaced_config_map) @@ -372,7 +389,7 @@ def delete_topics(topics): stderr=True, stdin=False, stdout=True, tty=False) if resp == '0': - print("Topics deleted") + print('Topics deleted') break return diff --git a/execution/uc-application/base/aggregation-deployment.yaml b/execution/uc-application/base/aggregation-deployment.yaml index 12bf9e718e81d90f308c7c57012ae0f558a7a7b6..07732ca1dd1e6b2b06f098dfb10a53d38e8d5cae 100644 --- a/execution/uc-application/base/aggregation-deployment.yaml +++ b/execution/uc-application/base/aggregation-deployment.yaml @@ -26,6 +26,8 @@ spec: value: "http://my-confluent-cp-schema-registry:8081" - name: JAVA_OPTS value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" + - name: COMMIT_INTERVAL_MS # Set as default for the applications + value: "100" resources: limits: memory: 4Gi