Skip to content
Snippets Groups Projects
Commit 8bbb8d8d authored by Sören Henning's avatar Sören Henning
Browse files

Adjust execution scripts and K82 resources

parent 1a75b629
No related branches found
No related tags found
No related merge requests found
...@@ -94,6 +94,7 @@ def load_yaml_files(): ...@@ -94,6 +94,7 @@ def load_yaml_files():
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy :return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
""" """
print('Load kubernetes yaml files') print('Load kubernetes yaml files')
wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml')
wg = load_yaml('uc-workload-generator/workloadGenerator.yaml') wg = load_yaml('uc-workload-generator/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/aggregation-service.yaml') app_svc = load_yaml('uc-application/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/service-monitor.yaml') app_svc_monitor = load_yaml('uc-application/service-monitor.yaml')
...@@ -101,7 +102,7 @@ def load_yaml_files(): ...@@ -101,7 +102,7 @@ def load_yaml_files():
app_deploy = load_yaml('uc-application/aggregation-deployment.yaml') app_deploy = load_yaml('uc-application/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded') print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def replace_env_value(container, key, value): def replace_env_value(container, key, value):
...@@ -113,8 +114,9 @@ def replace_env_value(container, key, value): ...@@ -113,8 +114,9 @@ def replace_env_value(container, key, value):
'value'] = value 'value'] = value
def start_workload_generator(wg_yaml, dim_value, uc_id): def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id):
"""Starts the workload generator. """Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator service.
:param wg_yaml: The yaml object for the workload generator. :param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use. :param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated. :param string uc_id: Use case id for which load should be generated.
...@@ -123,7 +125,18 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): ...@@ -123,7 +125,18 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
the yaml object. the yaml object.
""" """
print('Start workload generator') print('Start workload generator')
svc, wg_deploy = None, None
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace=namespace, body=svc_yaml)
print(f'Service {svc.metadata.name} created.')
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s", e.reason)
# Create Deployment
num_sensors = dim_value num_sensors = dim_value
wl_max_records = 150000 wl_max_records = 150000
wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records
...@@ -147,22 +160,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): ...@@ -147,22 +160,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
# Set environment variables # Set environment variables
replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors))
replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances))
if uc_id == '2': # Special configuration for uc2 if uc_id == '2': # Special configuration for uc2
replace_env_value( replace_env_value(
wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups))
try: try:
wg_ss = appsApi.create_namespaced_deployment( wg_deploy = appsApi.create_namespaced_deployment(
namespace=namespace, namespace=namespace,
body=wg_yaml body=wg_yaml
) )
print(f'Deployment {wg_ss.metadata.name} created.') print(f'Deployment {wg_deploy.metadata.name} created.')
return wg_ss
except client.rest.ApiException as e: except client.rest.ApiException as e:
print(f'Deployment creation error: {e.reason}') print(f'Deployment creation error: {e.reason}')
return wg_yaml wg_deploy = wg_yaml
return svc, wg_deploy
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
...@@ -317,19 +330,23 @@ def delete_resource(obj, del_func): ...@@ -317,19 +330,23 @@ def delete_resource(obj, del_func):
print('Resource deleted') print('Resource deleted')
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources. """Stops the applied applications and delete resources.
:param wg: The workload generator statefull set. :param wg: The load generator service.
:param wg: The load generator deployment.
:param app_svc: The application service. :param app_svc: The application service.
:param app_svc_monitor: The application service monitor. :param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map. :param app_jmx: The application jmx config map.
:param app_deploy: The application deployment. :param app_deploy: The application deployment.
""" """
print('Stop use case application and workload generator') print('Stop use case application and load generator')
print('Delete workload generator') print('Delete load generator deployment')
delete_resource(wg, appsApi.delete_namespaced_deployment) delete_resource(wg, appsApi.delete_namespaced_deployment)
print('Delete load generator service')
delete_resource(wg_svc, coreApi.delete_namespaced_service)
print('Delete app service') print('Delete app service')
delete_resource(app_svc, coreApi.delete_namespaced_service) delete_resource(app_svc, coreApi.delete_namespaced_service)
...@@ -492,12 +509,12 @@ def stop_lag_exporter(): ...@@ -492,12 +509,12 @@ def stop_lag_exporter():
return return
def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
""" """
Stop the applications, delete topics, reset zookeeper and stop lag exporter. Stop the applications, delete topics, reset zookeeper and stop lag exporter.
""" """
print('Reset cluster') print('Reset cluster')
stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy) stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------') print('---------------------')
delete_topics(topics) delete_topics(topics)
print('---------------------') print('---------------------')
...@@ -524,7 +541,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi ...@@ -524,7 +541,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
""" """
global namespace global namespace
namespace = ns namespace = ns
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------') print('---------------------')
initialize_kubernetes_api() initialize_kubernetes_api()
...@@ -538,24 +555,24 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi ...@@ -538,24 +555,24 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
# Check for reset options # Check for reset options
if reset_only: if reset_only:
# Only reset cluster an then end program # Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor, reset_cluster(wg_svc, wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics) app_jmx, app_deploy, topics)
sys.exit() sys.exit()
if reset: if reset:
# Reset cluster before execution # Reset cluster before execution
print('Reset only mode') print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor, reset_cluster(wg_svc, wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics) app_jmx, app_deploy, topics)
print('---------------------') print('---------------------')
# Register the reset operation so that is executed at the abort of program # Register the reset operation so that is executed at the abort of program
atexit.register(reset_cluster, wg, app_svc, atexit.register(reset_cluster, wg_svc, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics) app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics) create_topics(topics)
print('---------------------') print('---------------------')
wg = start_workload_generator(wg, dim_value, uc_id) wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id)
print('---------------------') print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
...@@ -578,7 +595,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi ...@@ -578,7 +595,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
print('---------------------') print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore # Reset cluster regular, therefore abort exit not needed anymore
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
atexit.unregister(reset_cluster) atexit.unregister(reset_cluster)
......
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-load-generator
labels:
app: titan-ccp-load-generator
spec:
type: ClusterIP
clusterIP: None
selector:
app: titan-ccp-load-generator
ports:
- name: coordination
port: 5701
targetPort: 5701
protocol: TCP
...@@ -16,23 +16,22 @@ spec: ...@@ -16,23 +16,22 @@ spec:
containers: containers:
- name: workload-generator - name: workload-generator
image: workload-generator:latest image: workload-generator:latest
ports:
- containerPort: 5701
name: coordination
env: env:
# Order need to be preserved for run_uc.py # Order need to be preserved for run_uc.py
- name: NUM_SENSORS - name: NUM_SENSORS
value: "25000" value: "25000"
- name: INSTANCES
value: "1"
- name: NUM_NESTED_GROUPS - name: NUM_NESTED_GROUPS
value: "5" value: "5"
- name: ZK_HOST - name: KUBERNETES_NAMESPACE
value: "my-confluent-cp-zookeeper" valueFrom:
- name: ZK_PORT fieldRef:
value: "2181" fieldPath: metadata.namespace
- name: KUBERNETES_DNS_NAME
value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL - name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081" value: "http://my-confluent-cp-schema-registry:8081"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment