Skip to content
Snippets Groups Projects
Commit 1cb280a8 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Make the run uc py functions parameterized and not use global var

Make the functions parameterized in order to call the main method
from another python class. Before the methods used a global
variable to access the values.
parent 32d56dd0
No related branches found
No related tags found
No related merge requests found
import argparse # parse arguments from cli
import atexit # used to clear resources at exit of program (e.g. ctrl-c)
import atexit # used to clear resources at exit of program (e.g. ctrl-c)
from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
import subprocess # execute bash commands
import sys # for exit of program
import sys # for exit of program
import time # process sleep
import yaml # convert from file to yaml object
coreApi = None # acces kubernetes core api
appsApi = None # acces kubernetes apps api
customApi = None # acces kubernetes custom object api
args = None # CLI arguments
def load_variables():
......@@ -83,6 +82,7 @@ def load_variables():
args = parser.parse_args()
print(args)
return args
def initialize_kubernetes_api():
......@@ -159,23 +159,25 @@ def load_yaml_files():
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def start_workload_generator(wg_yaml):
def start_workload_generator(wg_yaml, dim_value, uc_id):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated.
:return:
The StatefulSet created by the API or in case it already exist/error
the yaml object.
"""
print('Start workload generator')
num_sensors = args.dim_value
num_sensors = dim_value
wl_max_records = 150000
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2
if args.uc_id == '2':
if uc_id == '2':
print('use uc2 stuff')
num_nested_groups = args.dim_value
num_nested_groups = dim_value
num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int(
......@@ -187,13 +189,13 @@ def start_workload_generator(wg_yaml):
# TODO: acces over name of container
# Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \
wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \
'-workload-generator:latest'
# TODO: acces over name of attribute
# Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances)
if args.uc_id == '2': # Special configuration for uc2
if uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups)
try:
......@@ -208,7 +210,7 @@ def start_workload_generator(wg_yaml):
return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
"""Applies the service, service monitor, jmx config map and start the
use case application.
......@@ -216,6 +218,11 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
:param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application.
:param int instances: Number of instances for use case application.
:param string uc_id: The id of the use case to execute.
:param int commit_interval_ms: The commit interval in ms.
:param string memory_limit: The memory limit for the application.
:param string cpu_limit: The CPU limit for the application.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned.
......@@ -257,15 +264,15 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = args.instances
deploy_yaml['spec']['replicas'] = instances
# TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'theodolite/theodolite-uc' + args.uc_id \
app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest'
# TODO: acces over name of attribute
app_container['env'][0]['value'] = str(args.commit_interval_ms)
app_container['resources']['limits']['memory'] = args.memory_limit
app_container['resources']['limits']['cpu'] = args.cpu_limit
app_container['env'][0]['value'] = str(commit_interval_ms)
app_container['resources']['limits']['memory'] = memory_limit
app_container['resources']['limits']['cpu'] = cpu_limit
try:
app_deploy = appsApi.create_namespaced_deployment(
namespace="default",
......@@ -279,22 +286,31 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution():
"""Wait time while in execution."""
def wait_execution(execution_minutes):
"""
Wait time while in execution.
:param int execution_minutes: The duration to wait for execution.
"""
print('Wait while executing')
# TODO: ask which fits better
# time.sleep(args.execution_minutes * 60)
for i in range(args.execution_minutes):
for i in range(execution_minutes):
time.sleep(60)
print(f"Executed: {i+1} minutes")
print('Execution finished')
return
def run_evaluation():
"""Runs the evaluation function"""
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
:param string uc_id: ID of the executed use case.
:param int dim_value: The dimension value used for execution.
:param int instances: The number of instances used for the execution.
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
lag_analysis.main(args.exp_id, f'uc{args.uc_id}', args.dim_value, args.instances, args.execution_minutes)
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
return
......@@ -309,7 +325,8 @@ def delete_resource(obj, del_func):
try:
del_func(obj.metadata.name, 'default')
except Exception as e:
logging.debug('Error deleting resource with api object, try with dict.')
logging.debug(
'Error deleting resource with api object, try with dict.')
try:
del_func(obj['metadata']['name'], 'default')
except Exception as e:
......@@ -444,7 +461,7 @@ def reset_zookeeper():
text=True)
logging.debug(output)
if output.returncode == 1: # Means data not available anymore
if output.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
......@@ -495,52 +512,73 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
print('---------------------')
stop_lag_exporter()
def main():
load_variables()
print('---------------------')
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
:param string exp_id: The number of executed experiment
:param string uc_id: Use case to execute
:param int dim_value: Dimension value for load generator.
:param int instances: Number of instances for application.
:param int partitions: Number of partitions the kafka topics should have.
:param string cpu_limit: Max CPU utilazation for application.
:param string memory_limit: Max memory utilazation for application.
:param int commit_interval_ms: Kafka Streams commit interval in milliseconds
:param int execution_minutes: How long to execute the benchmark.
:param boolean reset: Flag for reset of cluster before execution.
:param boolean reset_only: Flag to only reset the application.
"""
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', args.partitions),
('output', args.partitions),
('aggregation-feedback', args.partitions),
topics = [('input', partitions),
('output', partitions),
('aggregation-feedback', partitions),
('configuration', 1)]
# Check for reset options
if args.reset_only:
if reset_only:
# Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
sys.exit()
if args.reset:
if reset:
# Reset cluster before execution
print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
print('---------------------')
# Register the reset operation so that is executed at the end of program
atexit.register(reset_cluster, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
atexit.register(reset_cluster, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics)
print('---------------------')
wg = start_workload_generator(wg)
wg = start_workload_generator(wg, dim_value, uc_id)
print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
app_svc,
app_svc_monitor,
app_jmx,
app_deploy)
app_deploy,
instances,
uc_id,
commit_interval_ms,
memory_limit,
cpu_limit)
print('---------------------')
wait_execution()
wait_execution(execution_minutes)
print('---------------------')
run_evaluation()
run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes)
print('---------------------')
# Cluster is resetted with atexit method
......@@ -549,4 +587,9 @@ def main():
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
args = load_variables()
print('---------------------')
main(args.exp_id, args.uc_id, args.dim_value, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_interval_ms, args.execution_minutes, args.reset,
args.reset_only)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment