Skip to content
Snippets Groups Projects
Commit cbe8a6f0 authored by Sören Henning's avatar Sören Henning
Browse files

Adjust execution scripts and K82 resources

parent 76ae2ace
No related branches found
No related tags found
2 merge requests!86Zookeeper free workload generator,!84Gitlab CI for Theodolite-Kotlin-Quarkus
Pipeline #2021 canceled
This commit is part of merge request !86. Comments created here will be created in the context of that merge request.
......@@ -94,6 +94,7 @@ def load_yaml_files():
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg_svc = load_yaml('uc-workload-generator/load-generator-service.yaml')
wg = load_yaml('uc-workload-generator/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/service-monitor.yaml')
......@@ -101,7 +102,7 @@ def load_yaml_files():
app_deploy = load_yaml('uc-application/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
return wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def replace_env_value(container, key, value):
......@@ -113,8 +114,9 @@ def replace_env_value(container, key, value):
'value'] = value
def start_workload_generator(wg_yaml, dim_value, uc_id):
def start_workload_generator(svc_yaml, wg_yaml, dim_value, uc_id):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator service.
:param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated.
......@@ -123,7 +125,18 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
the yaml object.
"""
print('Start workload generator')
svc, wg_deploy = None, None
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace=namespace, body=svc_yaml)
print(f'Service {svc.metadata.name} created.')
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s", e.reason)
# Create Deployment
num_sensors = dim_value
wl_max_records = 150000
wl_instances = (num_sensors + wl_max_records - 1) // wl_max_records
......@@ -147,22 +160,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
# Set environment variables
replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors))
replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances))
if uc_id == '2': # Special configuration for uc2
replace_env_value(
wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups))
try:
wg_ss = appsApi.create_namespaced_deployment(
wg_deploy = appsApi.create_namespaced_deployment(
namespace=namespace,
body=wg_yaml
)
print(f'Deployment {wg_ss.metadata.name} created.')
return wg_ss
print(f'Deployment {wg_deploy.metadata.name} created.')
except client.rest.ApiException as e:
print(f'Deployment creation error: {e.reason}')
return wg_yaml
wg_deploy = wg_yaml
return svc, wg_deploy
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
......@@ -317,19 +330,23 @@ def delete_resource(obj, del_func):
print('Resource deleted')
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
def stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param wg: The load generator service.
:param wg: The load generator deployment.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator')
print('Stop use case application and load generator')
print('Delete workload generator')
print('Delete load generator deployment')
delete_resource(wg, appsApi.delete_namespaced_deployment)
print('Delete load generator service')
delete_resource(wg_svc, coreApi.delete_namespaced_service)
print('Delete app service')
delete_resource(app_svc, coreApi.delete_namespaced_service)
......@@ -492,12 +509,12 @@ def stop_lag_exporter():
return
def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
def reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
"""
Stop the applications, delete topics, reset zookeeper and stop lag exporter.
"""
print('Reset cluster')
stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
stop_applications(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------')
delete_topics(topics)
print('---------------------')
......@@ -524,7 +541,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
"""
global namespace
namespace = ns
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
initialize_kubernetes_api()
......@@ -538,24 +555,24 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
# Check for reset options
if reset_only:
# Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor,
reset_cluster(wg_svc, wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
sys.exit()
if reset:
# Reset cluster before execution
print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor,
reset_cluster(wg_svc, wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
print('---------------------')
# Register the reset operation so that is executed at the abort of program
atexit.register(reset_cluster, wg, app_svc,
atexit.register(reset_cluster, wg_svc, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics)
print('---------------------')
wg = start_workload_generator(wg, dim_value, uc_id)
wg_svc, wg = start_workload_generator(wg_svc, wg, dim_value, uc_id)
print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
......@@ -578,7 +595,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
reset_cluster(wg_svc, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
atexit.unregister(reset_cluster)
......
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-load-generator
labels:
app: titan-ccp-load-generator
spec:
type: ClusterIP
clusterIP: None
selector:
app: titan-ccp-load-generator
ports:
- name: coordination
port: 5701
targetPort: 5701
protocol: TCP
......@@ -16,23 +16,22 @@ spec:
containers:
- name: workload-generator
image: workload-generator:latest
ports:
- containerPort: 5701
name: coordination
env:
# Order need to be preserved for run_uc.py
- name: NUM_SENSORS
value: "25000"
- name: INSTANCES
value: "1"
- name: NUM_NESTED_GROUPS
value: "5"
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: KUBERNETES_DNS_NAME
value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment