Skip to content
Snippets Groups Projects
Commit d9e307dd authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Make the run uc py functions parameterized and not use global var

Make the functions parameterized in order to call the main method
from another python class. Before the methods used a global
variable to access the values.
parent 34068526
No related branches found
No related tags found
1 merge request!42Integerate theodolite and run uc python scripts
...@@ -13,7 +13,6 @@ import yaml # convert from file to yaml object ...@@ -13,7 +13,6 @@ import yaml # convert from file to yaml object
coreApi = None # acces kubernetes core api coreApi = None # acces kubernetes core api
appsApi = None # acces kubernetes apps api appsApi = None # acces kubernetes apps api
customApi = None # acces kubernetes custom object api customApi = None # acces kubernetes custom object api
args = None # CLI arguments
def load_variables(): def load_variables():
...@@ -83,6 +82,7 @@ def load_variables(): ...@@ -83,6 +82,7 @@ def load_variables():
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
return args
def initialize_kubernetes_api(): def initialize_kubernetes_api():
...@@ -159,23 +159,25 @@ def load_yaml_files(): ...@@ -159,23 +159,25 @@ def load_yaml_files():
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def start_workload_generator(wg_yaml): def start_workload_generator(wg_yaml, dim_value, uc_id):
"""Starts the workload generator. """Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator. :param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated.
:return: :return:
The StatefulSet created by the API or in case it already exist/error The StatefulSet created by the API or in case it already exist/error
the yaml object. the yaml object.
""" """
print('Start workload generator') print('Start workload generator')
num_sensors = args.dim_value num_sensors = dim_value
wl_max_records = 150000 wl_max_records = 150000
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records)) wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2 # set parameters special for uc 2
if args.uc_id == '2': if uc_id == '2':
print('use uc2 stuff') print('use uc2 stuff')
num_nested_groups = args.dim_value num_nested_groups = dim_value
num_sensors = '4' num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int( wl_instances = int(
...@@ -187,13 +189,13 @@ def start_workload_generator(wg_yaml): ...@@ -187,13 +189,13 @@ def start_workload_generator(wg_yaml):
# TODO: acces over name of container # TODO: acces over name of container
# Set used use case # Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0] wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \ wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \
'-workload-generator:latest' '-workload-generator:latest'
# TODO: acces over name of attribute # TODO: acces over name of attribute
# Set environment variables # Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors) wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances) wg_containter['env'][1]['value'] = str(wl_instances)
if args.uc_id == '2': # Special configuration for uc2 if uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups) wg_containter['env'][2]['value'] = str(num_nested_groups)
try: try:
...@@ -208,7 +210,7 @@ def start_workload_generator(wg_yaml): ...@@ -208,7 +210,7 @@ def start_workload_generator(wg_yaml):
return wg_yaml return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
"""Applies the service, service monitor, jmx config map and start the """Applies the service, service monitor, jmx config map and start the
use case application. use case application.
...@@ -216,6 +218,11 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): ...@@ -216,6 +218,11 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
:param svc_monitor_yaml: The yaml object for the service monitor. :param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map. :param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application. :param deploy_yaml: The yaml object for the application.
:param int instances: Number of instances for use case application.
:param string uc_id: The id of the use case to execute.
:param int commit_interval_ms: The commit interval in ms.
:param string memory_limit: The memory limit for the application.
:param string cpu_limit: The CPU limit for the application.
:return: :return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment. The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned. In case the resource already exist/error the yaml object is returned.
...@@ -257,15 +264,15 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): ...@@ -257,15 +264,15 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
logging.error("ConfigMap creation error: %s" % e.reason) logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment # Create deployment
deploy_yaml['spec']['replicas'] = args.instances deploy_yaml['spec']['replicas'] = instances
# TODO: acces over name of container # TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0] app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'theodolite/theodolite-uc' + args.uc_id \ app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest' + '-kstreams-app:latest'
# TODO: acces over name of attribute # TODO: acces over name of attribute
app_container['env'][0]['value'] = str(args.commit_interval_ms) app_container['env'][0]['value'] = str(commit_interval_ms)
app_container['resources']['limits']['memory'] = args.memory_limit app_container['resources']['limits']['memory'] = memory_limit
app_container['resources']['limits']['cpu'] = args.cpu_limit app_container['resources']['limits']['cpu'] = cpu_limit
try: try:
app_deploy = appsApi.create_namespaced_deployment( app_deploy = appsApi.create_namespaced_deployment(
namespace="default", namespace="default",
...@@ -279,22 +286,31 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml): ...@@ -279,22 +286,31 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
return svc, svc_monitor, jmx_cm, app_deploy return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution(): def wait_execution(execution_minutes):
"""Wait time while in execution.""" """
Wait time while in execution.
:param int execution_minutes: The duration to wait for execution.
"""
print('Wait while executing') print('Wait while executing')
# TODO: ask which fits better
# time.sleep(args.execution_minutes * 60) for i in range(execution_minutes):
for i in range(args.execution_minutes):
time.sleep(60) time.sleep(60)
print(f"Executed: {i+1} minutes") print(f"Executed: {i+1} minutes")
print('Execution finished') print('Execution finished')
return return
def run_evaluation(): def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
"""Runs the evaluation function""" """
Runs the evaluation function
:param string exp_id: ID of the experiment.
:param string uc_id: ID of the executed use case.
:param int dim_value: The dimension value used for execution.
:param int instances: The number of instances used for the execution.
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function') print('Run evaluation function')
lag_analysis.main(args.exp_id, f'uc{args.uc_id}', args.dim_value, args.instances, args.execution_minutes) lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
return return
...@@ -309,7 +325,8 @@ def delete_resource(obj, del_func): ...@@ -309,7 +325,8 @@ def delete_resource(obj, del_func):
try: try:
del_func(obj.metadata.name, 'default') del_func(obj.metadata.name, 'default')
except Exception as e: except Exception as e:
logging.debug('Error deleting resource with api object, try with dict.') logging.debug(
'Error deleting resource with api object, try with dict.')
try: try:
del_func(obj['metadata']['name'], 'default') del_func(obj['metadata']['name'], 'default')
except Exception as e: except Exception as e:
...@@ -495,52 +512,73 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): ...@@ -495,52 +512,73 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
print('---------------------') print('---------------------')
stop_lag_exporter() stop_lag_exporter()
def main():
load_variables()
print('---------------------')
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
:param string exp_id: The number of executed experiment
:param string uc_id: Use case to execute
:param int dim_value: Dimension value for load generator.
:param int instances: Number of instances for application.
:param int partitions: Number of partitions the kafka topics should have.
:param string cpu_limit: Max CPU utilazation for application.
:param string memory_limit: Max memory utilazation for application.
:param int commit_interval_ms: Kafka Streams commit interval in milliseconds
:param int execution_minutes: How long to execute the benchmark.
:param boolean reset: Flag for reset of cluster before execution.
:param boolean reset_only: Flag to only reset the application.
"""
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files() wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------') print('---------------------')
initialize_kubernetes_api() initialize_kubernetes_api()
print('---------------------') print('---------------------')
topics = [('input', args.partitions), topics = [('input', partitions),
('output', args.partitions), ('output', partitions),
('aggregation-feedback', args.partitions), ('aggregation-feedback', partitions),
('configuration', 1)] ('configuration', 1)]
# Check for reset options # Check for reset options
if args.reset_only: if reset_only:
# Only reset cluster an then end program # Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
sys.exit() sys.exit()
if args.reset: if reset:
# Reset cluster before execution # Reset cluster before execution
print('Reset only mode') print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
print('---------------------') print('---------------------')
# Register the reset operation so that is executed at the end of program # Register the reset operation so that is executed at the end of program
atexit.register(reset_cluster, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics) atexit.register(reset_cluster, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics) create_topics(topics)
print('---------------------') print('---------------------')
wg = start_workload_generator(wg) wg = start_workload_generator(wg, dim_value, uc_id)
print('---------------------') print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application( app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
app_svc, app_svc,
app_svc_monitor, app_svc_monitor,
app_jmx, app_jmx,
app_deploy) app_deploy,
instances,
uc_id,
commit_interval_ms,
memory_limit,
cpu_limit)
print('---------------------') print('---------------------')
wait_execution() wait_execution(execution_minutes)
print('---------------------') print('---------------------')
run_evaluation() run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes)
print('---------------------') print('---------------------')
# Cluster is resetted with atexit method # Cluster is resetted with atexit method
...@@ -549,4 +587,9 @@ def main(): ...@@ -549,4 +587,9 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
main() args = load_variables()
print('---------------------')
main(args.exp_id, args.uc_id, args.dim_value, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_interval_ms, args.execution_minutes, args.reset,
args.reset_only)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment