Skip to content
Snippets Groups Projects
Commit b1d6976e authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Load yaml files from method, comment methods and add logging.

Apply the DRY principle and create method to load the methods with
a given path.
Further add comments to methods for better understanding and enable
the logging.
parent 2b4c1702
No related branches found
No related tags found
No related merge requests found
import argparse # parse arguments from cli
from kubernetes import client, config # kubernetes api from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream from kubernetes.stream import stream
import argparse # parse arguments from cli import logging # logging
from os import path # path utilities from os import path # path utilities
import subprocess # execute bash commands import subprocess # execute bash commands
import time # process sleep import time # process sleep
...@@ -13,6 +14,7 @@ args = None # CLI arguments ...@@ -13,6 +14,7 @@ args = None # CLI arguments
def load_variables(): def load_variables():
"""Load the CLI variables given at the command line"""
global args global args
print('Load CLI variables') print('Load CLI variables')
parser = argparse.ArgumentParser(description='Run use case Programm') parser = argparse.ArgumentParser(description='Run use case Programm')
...@@ -68,6 +70,9 @@ def load_variables(): ...@@ -68,6 +70,9 @@ def load_variables():
def initialize_kubernetes_api(): def initialize_kubernetes_api():
"""Load the kubernetes config from local or the cluster and creates
needed APIs.
"""
global coreApi, appsApi, customApi global coreApi, appsApi, customApi
print('Connect to kubernetes api') print('Connect to kubernetes api')
try: try:
...@@ -82,6 +87,9 @@ def initialize_kubernetes_api(): ...@@ -82,6 +87,9 @@ def initialize_kubernetes_api():
def create_topics(topics): def create_topics(topics):
"""Create the topics needed for the use cases
:param topics: List of topics that should be created.
"""
# Calling exec and waiting for response # Calling exec and waiting for response
print('Create topics') print('Create topics')
for (topic, partitions) in topics: for (topic, partitions) in topics:
...@@ -103,80 +111,114 @@ def create_topics(topics): ...@@ -103,80 +111,114 @@ def create_topics(topics):
print(resp) print(resp)
def start_workload_generator(): def load_yaml(file_path):
"""Creates a yaml file from the file at given path.
:param file_path: The path to the file which contains the yaml.
:return: The file as a yaml object.
"""
try:
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except:
print('Error opening file %s' % file_path)
def load_yaml_files():
"""Load the needed yaml files and creates objects from them.
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
def start_workload_generator(wg_yaml):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:return: The StatefulSet created by the API
"""
print('Start workload generator') print('Start workload generator')
num_sensors = args.dim_value num_sensors = args.dim_value
wl_max_records = 150000 wl_max_records = 150000
# TODO: How is this calculation done? # TODO: How is this calculation done?
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# Customize workload generator creations
# Create statefull set wg_yaml['spec']['replicas'] = wl_instances
with open(path.join(path.dirname(__file__), "uc-workload-generator/base/workloadGenerator.yaml")) as f:
dep = yaml.safe_load(f)
dep['spec']['replicas'] = wl_instances
# TODO: acces over name of container # TODO: acces over name of container
wg_containter = dep['spec']['template']['spec']['containers'][0] wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest' wg_containter['image'] = 'soerenhenning/uc' + args.uc_id + '-wg:latest'
# TODO: acces over name of attribute # TODO: acces over name of attribute
wg_containter['env'][1]['value'] = str(num_sensors) wg_containter['env'][1]['value'] = str(num_sensors)
wg_containter['env'][2]['value'] = str(wl_instances) wg_containter['env'][2]['value'] = str(wl_instances)
print(dep)
try: try:
resp = appsApi.create_namespaced_stateful_set( wg_ss = appsApi.create_namespaced_stateful_set(
namespace="default", namespace="default",
body=dep body=wg_yaml
) )
print("StatefulSet '%s' created." % resp.metadata.name) print("StatefulSet '%s' created." % wg_ss.metadata.name)
return wg_ss
except client.rest.ApiException as e: except client.rest.ApiException as e:
print("StatefulSet creation error: %s" % e.reason) print("StatefulSet creation error: %s" % e.reason)
return return
def start_application(): def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
print('Start use case applications') """Applies the service, service monitor, jmx config map and start the
use case application.
# Apply aggregation service :param svc_yaml: The yaml object for the service.
with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-service.yaml")) as f: :param svc_monitor_yaml: The yaml object for the service monitor.
dep = yaml.safe_load(f) :param jmx_yaml: The yaml object for the jmx config map.
try: :param deploy_yaml: The yaml object for the application.
resp = coreApi.create_namespaced_service( :return:
namespace="default", body=dep) The Service, ServiceMonitor, JMX ConfigMap and Deployment.
print("Service '%s' created." % resp.metadata.name) return svc, svc_monitor, jmx_cm, app_deploy
except: """
print("Service creation error.") print('Start use case application')
svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None
# Apply jmx config map for aggregation service # Create Service
with open(path.join(path.dirname(__file__), "uc-application/base/jmx-configmap.yaml")) as f:
dep = yaml.safe_load(f)
try: try:
resp = coreApi.create_namespaced_config_map( svc = coreApi.create_namespaced_service(
namespace="default", body=dep) namespace="default", body=svc_yaml)
print("ConfigMap '%s' created." % resp.metadata.name) print("Service '%s' created." % svc.metadata.name)
except: except:
print("ConfigMap creation error.") print("Service creation error.")
# Create custom object service monitor # Create custom object service monitor
with open(path.join(path.dirname(__file__), "uc-application/base/service-monitor.yaml")) as f:
dep = yaml.safe_load(f)
try: try:
resp = customApi.create_namespaced_custom_object( svc_monitor = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com", group="monitoring.coreos.com",
version="v1", version="v1",
namespace="default", namespace="default",
plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor plural="servicemonitors", # From CustomResourceDefinition of ServiceMonitor
body=dep, body=svc_monitor_yaml,
) )
print("ServiceMonitor '%s' created." % resp['metadata']['name']) print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except: except:
print("ServiceMonitor creation error") print("ServiceMonitor creation error")
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace="default", body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except:
print("ConfigMap creation error.")
# Create deployment # Create deployment
with open(path.join(path.dirname(__file__), "uc-application/base/aggregation-deployment.yaml")) as f: deploy_yaml['spec']['replicas'] = args.instances
dep = yaml.safe_load(f)
dep['spec']['replicas'] = args.instances
# TODO: acces over name of container # TODO: acces over name of container
app_container = dep['spec']['template']['spec']['containers'][0] app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest' app_container['image'] = 'soerenhenning/uc' + args.uc_id + '-app:latest'
# TODO: acces over name of attribute # TODO: acces over name of attribute
app_container['env'][1]['value'] = str(args.commit_interval_ms) app_container['env'][1]['value'] = str(args.commit_interval_ms)
...@@ -185,15 +227,17 @@ def start_application(): ...@@ -185,15 +227,17 @@ def start_application():
try: try:
resp = appsApi.create_namespaced_deployment( resp = appsApi.create_namespaced_deployment(
namespace="default", namespace="default",
body=dep body=deploy_yaml
) )
print("Deployment '%s' created." % resp.metadata.name) print("Deployment '%s' created." % resp.metadata.name)
except client.rest.ApiException as e: except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason) print("Deployment creation error: %s" % e.reason)
return
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution(): def wait_execution():
"""Wait time while in execution."""
print('Wait while executing') print('Wait while executing')
# TODO: ask which fits better # TODO: ask which fits better
# time.sleep(args.execution_minutes * 60) # time.sleep(args.execution_minutes * 60)
...@@ -205,6 +249,7 @@ def wait_execution(): ...@@ -205,6 +249,7 @@ def wait_execution():
def run_evaluation_script(): def run_evaluation_script():
"""Runs the evaluation script."""
# TODO: implement # TODO: implement
# # Run eval script # # Run eval script
# source ../.venv/bin/activate # source ../.venv/bin/activate
...@@ -214,7 +259,14 @@ def run_evaluation_script(): ...@@ -214,7 +259,14 @@ def run_evaluation_script():
return return
def stop_applications(): def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator') print('Stop use case application and workload generator')
exec_command = [ exec_command = [
'kubectl', 'kubectl',
...@@ -232,6 +284,9 @@ def stop_applications(): ...@@ -232,6 +284,9 @@ def stop_applications():
def delete_topics(topics): def delete_topics(topics):
"""Delete topics from Kafka.
:param topics: List of topics to delete.
"""
print('Delete topics from Kafka') print('Delete topics from Kafka')
num_topics_command = [ num_topics_command = [
...@@ -271,9 +326,11 @@ def delete_topics(topics): ...@@ -271,9 +326,11 @@ def delete_topics(topics):
return return
# Stop the lag exporter in order to reset it and allow smooth execution for
# next use cases
def stop_lag_exporter(): def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
next use cases.
"""
print('Stop the lag exporter') print('Stop the lag exporter')
find_pod_command = [ find_pod_command = [
...@@ -301,18 +358,18 @@ def stop_lag_exporter(): ...@@ -301,18 +358,18 @@ def stop_lag_exporter():
def main(): def main():
load_variables() load_variables()
print('---------------------') print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', args.partitions), topics = [('input', args.partitions),
('output', args.partitions), ('output', args.partitions),
('configuration', 1)] ('configuration', 1)]
initialize_kubernetes_api()
print('---------------------')
create_topics(topics) create_topics(topics)
print('---------------------') print('---------------------')
start_workload_generator() wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
wg = start_workload_generator(wg)
print('---------------------') print('---------------------')
start_application() app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------') print('---------------------')
wait_execution() wait_execution()
print('---------------------') print('---------------------')
...@@ -324,4 +381,5 @@ def main(): ...@@ -324,4 +381,5 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main() main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment