diff --git a/execution/run_uc.py b/execution/run_uc.py index 9bbb2876447438c1c3ac676091b11f6baa990622..7f5580f20863482281095aa1fda220393c91e8b0 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -94,6 +94,7 @@ def load_yaml_files(): :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy """ print('Load kubernetes yaml files') + wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml') wg = load_yaml('uc-workload-generator/workloadGenerator.yaml') app_svc = load_yaml('uc-application/aggregation-service.yaml') app_svc_monitor = load_yaml('uc-application/service-monitor.yaml') @@ -101,7 +102,7 @@ def load_yaml_files(): app_deploy = load_yaml('uc-application/aggregation-deployment.yaml') print('Kubernetes yaml files loaded') - return wg, app_svc, app_svc_monitor, app_jmx, app_deploy + return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy def replace_env_value(container, key, value): @@ -113,8 +114,9 @@ def replace_env_value(container, key, value): 'value'] = value -def start_workload_generator(wg_yaml, dim_value, uc_id): +def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id): """Starts the workload generator. + :param wg_yaml: The yaml object for the workload generator service. :param wg_yaml: The yaml object for the workload generator. :param string dim_value: The dimension value the load generator should use. :param string uc_id: Use case id for which load should be generated. @@ -123,7 +125,18 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): the yaml object. """ print('Start workload generator') + svc, wg_deploy = None, None + # Create Service + try: + svc = coreApi.create_namespaced_service( + namespace=namespace, body=svc_yaml) + print(f'Service {svc.metadata.name} created.') + except client.rest.ApiException as e: + svc = svc_yaml + logging.error("Service creation error: %s", e.reason) + + # Create Deployment num_sensors = dim_value wl_max_records = 150000 wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records @@ -147,22 +160,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): # Set environment variables replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) - replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances)) if uc_id == '2': # Special configuration for uc2 replace_env_value( wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) try: - wg_ss = appsApi.create_namespaced_deployment( + wg_deploy = appsApi.create_namespaced_deployment( namespace=namespace, body=wg_yaml ) - print(f'Deployment {wg_ss.metadata.name} created.') - return wg_ss + print(f'Deployment {wg_deploy.metadata.name} created.') except client.rest.ApiException as e: print(f'Deployment creation error: {e.reason}') - return wg_yaml + wg_deploy = wg_yaml + + return svc, wg_deploy def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, @@ -317,19 +330,23 @@ def delete_resource(obj, del_func): print('Resource deleted') -def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): +def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy): """Stops the applied applications and delete resources. - :param wg: The workload generator statefull set. + :param wg: The load generator service. + :param wg: The load generator deployment. :param app_svc: The application service. :param app_svc_monitor: The application service monitor. :param app_jmx: The application jmx config map. :param app_deploy: The application deployment. """ - print('Stop use case application and workload generator') + print('Stop use case application and load generator') - print('Delete workload generator') + print('Delete load generator deployment') delete_resource(wg, appsApi.delete_namespaced_deployment) + print('Delete load generator service') + delete_resource(wg_svc, coreApi.delete_namespaced_service) + print('Delete app service') delete_resource(app_svc, coreApi.delete_namespaced_service) @@ -492,12 +509,12 @@ def stop_lag_exporter(): return -def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): +def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): """ Stop the applications, delete topics, reset zookeeper and stop lag exporter. """ print('Reset cluster') - stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy) + stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy) print('---------------------') delete_topics(topics) print('---------------------') @@ -524,7 +541,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi """ global namespace namespace = ns - wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() + wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() print('---------------------') initialize_kubernetes_api() @@ -538,24 +555,24 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi # Check for reset options if reset_only: # Only reset cluster an then end program - reset_cluster(wg, app_svc, app_svc_monitor, + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) sys.exit() if reset: # Reset cluster before execution print('Reset only mode') - reset_cluster(wg, app_svc, app_svc_monitor, + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) print('---------------------') # Register the reset operation so that is executed at the abort of program - atexit.register(reset_cluster, wg, app_svc, + atexit.register(reset_cluster, wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) create_topics(topics) print('---------------------') - wg = start_workload_generator(wg, dim_value, uc_id) + wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id) print('---------------------') app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( @@ -578,7 +595,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi print('---------------------') # Reset cluster regular, therefore abort exit not needed anymore - reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) + reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) atexit.unregister(reset_cluster) diff --git a/execution/uc-workload-generator/load-generator-service.yaml b/execution/uc-workload-generator/load-generator-service.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c1299e373009dee5fa4cc87093ebc684c7f2e333 --- /dev/null +++ b/execution/uc-workload-generator/load-generator-service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: titan-ccp-load-generator + labels: + app: titan-ccp-load-generator +spec: + type: ClusterIP + clusterIP: None + selector: + app: titan-ccp-load-generator + ports: + - name: coordination + port: 5701 + targetPort: 5701 + protocol: TCP diff --git a/execution/uc-workload-generator/workloadGenerator.yaml b/execution/uc-workload-generator/workloadGenerator.yaml index 794468b18dc74ca09872577b5b3c115605bd4620..146e285f66d4c0e1a88d613e4ac2d5571234fad6 100644 --- a/execution/uc-workload-generator/workloadGenerator.yaml +++ b/execution/uc-workload-generator/workloadGenerator.yaml @@ -16,23 +16,22 @@ spec: containers: - name: workload-generator image: workload-generator:latest + ports: + - containerPort: 5701 + name: coordination env: # Order need to be preserved for run_uc.py - name: NUM_SENSORS value: "25000" - - name: INSTANCES - value: "1" - name: NUM_NESTED_GROUPS value: "5" - - name: ZK_HOST - value: "my-confluent-cp-zookeeper" - - name: ZK_PORT - value: "2181" + - name: KUBERNETES_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KUBERNETES_DNS_NAME + value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: SCHEMA_REGISTRY_URL value: "http://my-confluent-cp-schema-registry:8081" - - name: POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name