Skip to content
Snippets Groups Projects
Commit a78869eb authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Load yaml files from method, comment methods and add logging.

Apply the DRY principle and create method to load the methods with
a given path.
Further add comments to methods for better understanding and enable
the logging.
parent 3b64aed6
No related branches found
No related tags found
2 merge requests!42Integerate theodolite and run uc python scripts,!24run UC as python implementation
import argparse # parse arguments from cli
from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import argparse # parse arguments from cli
import logging # logging
from os import path # path utilities
import subprocess # execute bash commands
import time # process sleep
......@@ -13,6 +14,7 @@ args = None # CLI arguments
def load_variables():
"""Load the CLI variables given at the command line"""
global args
print('Load CLI variables')
parser = argparse.ArgumentParser(description='Run use case Programm')
......@@ -68,6 +70,9 @@ def load_variables():
def initialize_kubernetes_api():
"""Load the kubernetes config from local or the cluster and creates
needed APIs.
"""
global coreApi, appsApi, customApi
print('Connect to kubernetes api')
try:
......@@ -82,6 +87,9 @@ def initialize_kubernetes_api():
def create_topics(topics):
"""Create the topics needed for the use cases
:param topics: List of topics that should be created.
"""
# Calling exec and waiting for response
print('Create topics')
for (topic, partitions) in topics:
......@@ -103,97 +111,133 @@ def create_topics(topics):
print(resp)
def start_workload_generator():
def load_yaml(file_path):
"""Creates a yaml file from the file at given path.
:param file_path: The path to the file which contains the yaml.
:return: The file as a yaml object.
"""
try:
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except:
print('Error opening file %s' % file_path)
def load_yaml_files():
"""Load the needed yaml files and creates objects from them.
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
def start_workload_generator(wg_yaml):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:return: The StatefulSet created by the API
"""
print('Start workload generator')
num_sensors = args.dim_value
wl_max_records = 150000
# TODO: How is this calculation done?
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# Customize workload generator creations
wg_yaml['spec']['replicas'] = wl_instances
# TODO: acces over name of container
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest'
# TODO: acces over name of attribute
wg_containter['env'][1]['value'] = str(num_sensors)
wg_containter['env'][2]['value'] = str(wl_instances)
# Create statefull set
with open(path.join(path.dirname(__file__), "uc-workload-generator/base/workloadGenerator.yaml")) as f:
dep = yaml.safe_load(f)
dep['spec']['replicas'] = wl_instances
# TODO: acces over name of container
wg_containter = dep['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest'
# TODO: acces over name of attribute
wg_containter['env'][1]['value'] = str(num_sensors)
wg_containter['env'][2]['value'] = str(wl_instances)
print(dep)
try:
resp = appsApi.create_namespaced_stateful_set(
namespace="default",
body=dep
)
print("StatefulSet '%s' created." % resp.metadata.name)
except client.rest.ApiException as e:
print("StatefulSet creation error: %s" % e.reason)
try:
wg_ss = appsApi.create_namespaced_stateful_set(
namespace="default",
body=wg_yaml
)
print("StatefulSet '%s' created." % wg_ss.metadata.name)
return wg_ss
except client.rest.ApiException as e:
print("StatefulSet creation error: %s" % e.reason)
return
def start_application():
print('Start use case applications')
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
"""Applies the service, service monitor, jmx config map and start the
use case application.
# Apply aggregation service
with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-service.yaml")) as f:
dep = yaml.safe_load(f)
try:
resp = coreApi.create_namespaced_service(
namespace="default", body=dep)
print("Service '%s' created." % resp.metadata.name)
except:
print("Service creation error.")
:param svc_yaml: The yaml object for the service.
:param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
return svc, svc_monitor, jmx_cm, app_deploy
"""
print('Start use case application')
svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None
# Apply jmx config map for aggregation service
with open(path.join(path.dirname(__file__), "uc-application/base/jmx-configmap.yaml")) as f:
dep = yaml.safe_load(f)
try:
resp = coreApi.create_namespaced_config_map(
namespace="default", body=dep)
print("ConfigMap '%s' created." % resp.metadata.name)
except:
print("ConfigMap creation error.")
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace="default", body=svc_yaml)
print("Service '%s' created." % svc.metadata.name)
except:
print("Service creation error.")
# Create custom object service monitor
with open(path.join(path.dirname(__file__), "uc-application/base/service-monitor.yaml")) as f:
dep = yaml.safe_load(f)
try:
resp = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor
body=dep,
)
print("ServiceMonitor '%s' created." % resp['metadata']['name'])
except:
print("ServiceMonitor creation error")
try:
svc_monitor = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor
body=svc_monitor_yaml,
)
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except:
print("ServiceMonitor creation error")
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace="default", body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except:
print("ConfigMap creation error.")
# Create deployment
with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-deployment.yaml")) as f:
dep = yaml.safe_load(f)
dep['spec']['replicas'] = args.instances
# TODO: acces over name of container
app_container = dep['spec']['template']['spec']['containers'][0]
app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest'
# TODO: acces over name of attribute
app_container['env'][1]['value'] = str(args.commit_interval_ms)
app_container['resources']['limits']['memory'] = args.memory_limit
app_container['resources']['limits']['cpu'] = args.cpu_limit
try:
resp = appsApi.create_namespaced_deployment(
namespace="default",
body=dep
)
print("Deployment '%s' created." % resp.metadata.name)
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
return
deploy_yaml['spec']['replicas'] = args.instances
# TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest'
# TODO: acces over name of attribute
app_container['env'][1]['value'] = str(args.commit_interval_ms)
app_container['resources']['limits']['memory'] = args.memory_limit
app_container['resources']['limits']['cpu'] = args.cpu_limit
try:
resp = appsApi.create_namespaced_deployment(
namespace="default",
body=deploy_yaml
)
print("Deployment '%s' created." % resp.metadata.name)
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution():
"""Wait time while in execution."""
print('Wait while executing')
# TODO: ask which fits better
# time.sleep(args.execution_minutes * 60)
......@@ -205,6 +249,7 @@ def wait_execution():
def run_evaluation_script():
"""Runs the evaluation script."""
# TODO: implement
# # Run eval script
# source ../.venv/bin/activate
......@@ -214,7 +259,14 @@ def run_evaluation_script():
return
def stop_applications():
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator')
exec_command = [
'kubectl',
......@@ -232,6 +284,9 @@ def stop_applications():
def delete_topics(topics):
"""Delete topics from Kafka.
:param topics: List of topics to delete.
"""
print('Delete topics from Kafka')
num_topics_command = [
......@@ -271,9 +326,11 @@ def delete_topics(topics):
return
# Stop the lag exporter in order to reset it and allow smooth execution for
# next use cases
def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
next use cases.
"""
print('Stop the lag exporter')
find_pod_command = [
......@@ -301,18 +358,18 @@ def stop_lag_exporter():
def main():
load_variables()
print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', args.partitions),
('output', args.partitions),
('configuration', 1)]
initialize_kubernetes_api()
print('---------------------')
create_topics(topics)
print('---------------------')
start_workload_generator()
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
wg = start_workload_generator(wg)
print('---------------------')
start_application()
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------')
wait_execution()
print('---------------------')
......@@ -324,4 +381,5 @@ def main():
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment