run_uc.py 18.85 KiB
import argparse # parse arguments from cli
import atexit # used to clear resources at exit of program (e.g. ctrl-c)
from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
from lib.cli_parser import execution_parser
import subprocess # execute bash commands
import sys # for exit of program
import time # process sleep
import yaml # convert from file to yaml object
coreApi = None # acces kubernetes core api
appsApi = None # acces kubernetes apps api
customApi = None # acces kubernetes custom object api
def load_variables():
"""Load the CLI variables given at the command line"""
print('Load CLI variables')
parser = execution_parser(description='Run use case Programm')
args = parser.parse_args()
print(args)
if args.exp_id is None or args.uc is None or args.load is None or args.instances is None :
print('The options --exp-id, --uc, --load and --instances are mandatory.')
print('Some might not be set!')
sys.exit(1)
return args
def initialize_kubernetes_api():
"""Load the kubernetes config from local or the cluster and creates
needed APIs.
"""
global coreApi, appsApi, customApi
print('Connect to kubernetes api')
try:
config.load_kube_config() # try using local config
except config.config_exception.ConfigException as e:
# load config from pod, if local config is not available
logging.debug('Failed loading local Kubernetes configuration,'
+ ' try from cluster')
logging.debug(e)
config.load_incluster_config()
coreApi = client.CoreV1Api()
appsApi = client.AppsV1Api()
customApi = client.CustomObjectsApi()
def create_topics(topics):
"""Create the topics needed for the use cases
:param topics: List of topics that should be created.
"""
# Calling exec and waiting for response
print('Create topics')
for (topic, partitions) in topics:
print('Create topic ' + topic + ' with #' + str(partitions)
+ ' partitions')
exec_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\
--create --topic {topic} --partitions {partitions}\
--replication-factor 1'
]
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
def load_yaml(file_path):
"""Creates a yaml file from the file at given path.
:param file_path: The path to the file which contains the yaml.
:return: The file as a yaml object.
"""
try:
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except Exception as e:
logging.error('Error opening file %s' % file_path)
logging.error(e)
def load_yaml_files():
"""Load the needed yaml files and creates objects from them.
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def start_workload_generator(wg_yaml, dim_value, uc_id):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated.
:return:
The StatefulSet created by the API or in case it already exist/error
the yaml object.
"""
print('Start workload generator')
num_sensors = dim_value
wl_max_records = 150000
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2
if uc_id == '2':
print('use uc2 stuff')
num_nested_groups = dim_value
num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int(
((approx_num_sensors + wl_max_records - 1) / wl_max_records)
)
# Customize workload generator creations
wg_yaml['spec']['replicas'] = wl_instances
# TODO: acces over name of container
# Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \
'-workload-generator:latest'
# TODO: acces over name of attribute
# Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances)
if uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups)
try:
wg_ss = appsApi.create_namespaced_deployment(
namespace="default",
body=wg_yaml
)
print("Deployment '%s' created." % wg_ss.metadata.name)
return wg_ss
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
"""Applies the service, service monitor, jmx config map and start the
use case application.
:param svc_yaml: The yaml object for the service.
:param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application.
:param int instances: Number of instances for use case application.
:param string uc_id: The id of the use case to execute.
:param int commit_interval_ms: The commit interval in ms.
:param string memory_limit: The memory limit for the application.
:param string cpu_limit: The CPU limit for the application.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned.
return svc, svc_monitor, jmx_cm, app_deploy
"""
print('Start use case application')
svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace="default", body=svc_yaml)
print("Service '%s' created." % svc.metadata.name)
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s" % e.reason)
# Create custom object service monitor
try:
svc_monitor = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors", # CustomResourceDef of ServiceMonitor
body=svc_monitor_yaml,
)
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except client.rest.ApiException as e:
svc_monitor = svc_monitor_yaml
logging.error("ServiceMonitor creation error: %s" % e.reason)
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace="default", body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except client.rest.ApiException as e:
jmx_cm = jmx_yaml
logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = instances
# TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest'
# TODO: acces over name of attribute
app_container['env'][0]['value'] = str(commit_interval_ms)
app_container['resources']['limits']['memory'] = memory_limit
app_container['resources']['limits']['cpu'] = cpu_limit
try:
app_deploy = appsApi.create_namespaced_deployment(
namespace="default",
body=deploy_yaml
)
print("Deployment '%s' created." % app_deploy.metadata.name)
except client.rest.ApiException as e:
app_deploy = deploy_yaml
logging.error("Deployment creation error: %s" % e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution(execution_minutes):
"""
Wait time while in execution.
:param int execution_minutes: The duration to wait for execution.
"""
print('Wait while executing')
for i in range(execution_minutes):
time.sleep(60)
print(f"Executed: {i+1} minutes")
print('Execution finished')
return
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
:param string uc_id: ID of the executed use case.
:param int dim_value: The dimension value used for execution.
:param int instances: The number of instances used for the execution.
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
return
def delete_resource(obj, del_func):
"""
Helper function to delete kuberentes resources.
First tries to delete with the kuberentes object.
Then it uses the dict representation of yaml to delete the object.
:param obj: Either kubernetes resource object or yaml as a dict.
:param del_func: The function that need to be executed for deletion
"""
try:
del_func(obj.metadata.name, 'default')
except Exception as e:
logging.debug(
'Error deleting resource with api object, try with dict.')
try:
del_func(obj['metadata']['name'], 'default')
except Exception as e:
logging.error("Error deleting resource")
logging.error(e)
return
print('Resource deleted')
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator')
print('Delete workload generator')
delete_resource(wg, appsApi.delete_namespaced_deployment)
print('Delete app service')
delete_resource(app_svc, coreApi.delete_namespaced_service)
print('Delete service monitor')
try:
customApi.delete_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors",
name=app_svc_monitor['metadata']['name'])
print('Resource deleted')
except Exception as e:
print("Error deleting service monitor")
print('Delete jmx config map')
delete_resource(app_jmx, coreApi.delete_namespaced_config_map)
print('Delete uc application')
delete_resource(app_deploy, appsApi.delete_namespaced_deployment)
return
def delete_topics(topics):
"""Delete topics from Kafka.
:param topics: List of topics to delete.
"""
print('Delete topics from Kafka')
topics_delete = 'theodolite-.*|' + '|'.join([ti[0] for ti in topics])
num_topics_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list \
| sed -n -E "/^({topics_delete})\
( - marked for deletion)?$/p" | wc -l'
]
topics_deletion_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete \
--topic "{topics_delete}"'
]
# Wait that topics get deleted
while True:
# topic deletion, sometimes a second deletion seems to be required
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=topics_deletion_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
print('Wait for topic deletion')
time.sleep(2)
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=num_topics_command,
stderr=True, stdin=False,
stdout=True, tty=False)
if resp == '0':
print("Topics deleted")
break
return
def reset_zookeeper():
"""Delete ZooKeeper configurations used for workload generation.
"""
print('Delete ZooKeeper configurations used for workload generation')
delete_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall '
+ '/workload-generation'
]
check_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 get '
+ '/workload-generation'
]
# Wait for configuration deletion
while True:
# Delete Zookeeper configuration data
output = subprocess.run(delete_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output.stdout)
# Check data is deleted
output = subprocess.run(check_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output)
if output.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
print('ZooKeeper reset was not successful. Retrying in 5s.')
time.sleep(5)
return
def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
next use cases.
"""
print('Stop the lag exporter')
try:
# Get lag exporter
pod_list = coreApi.list_namespaced_pod(namespace='default', label_selector='app.kubernetes.io/name=kafka-lag-exporter')
lag_exporter_pod = pod_list.items[0].metadata.name
# Delete lag exporter pod
res = coreApi.delete_namespaced_pod(name=lag_exporter_pod, namespace='default')
except ApiException as e:
logging.error('Exception while stopping lag exporter')
logging.error(e)
print('Deleted lag exporter pod: ' + lag_exporter_pod)
return
def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
"""
Stop the applications, delete topics, reset zookeeper and stop lag exporter.
"""
print('Reset cluster')
stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------')
delete_topics(topics)
print('---------------------')
reset_zookeeper()
print('---------------------')
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
:param string exp_id: The number of executed experiment
:param string uc_id: Use case to execute
:param int dim_value: Dimension value for load generator.
:param int instances: Number of instances for application.
:param int partitions: Number of partitions the kafka topics should have.
:param string cpu_limit: Max CPU utilazation for application.
:param string memory_limit: Max memory utilazation for application.
:param int commit_interval_ms: Kafka Streams commit interval in milliseconds
:param int execution_minutes: How long to execute the benchmark.
:param boolean reset: Flag for reset of cluster before execution.
:param boolean reset_only: Flag to only reset the application.
"""
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', partitions),
('output', partitions),
('aggregation-feedback', partitions),
('configuration', 1)]
# Check for reset options
if reset_only:
# Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
sys.exit()
if reset:
# Reset cluster before execution
print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
print('---------------------')
# Register the reset operation so that is executed at the abort of program
atexit.register(reset_cluster, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics)
print('---------------------')
wg = start_workload_generator(wg, dim_value, uc_id)
print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
app_svc,
app_svc_monitor,
app_jmx,
app_deploy,
instances,
uc_id,
commit_interval_ms,
memory_limit,
cpu_limit)
print('---------------------')
wait_execution(execution_minutes)
print('---------------------')
run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes)
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
atexit.unregister(reset_cluster)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
print('---------------------')
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_ms, args.duration, args.reset,
args.reset_only)