Skip to content
Snippets Groups Projects

run UC as python implementation

Closed Björn Vonheiden requested to merge stu202077/theodolite:feature/runUcPython into master
All threads resolved!
+ 49
22
@@ -77,8 +77,11 @@ def initialize_kubernetes_api():
print('Connect to kubernetes api')
try:
config.load_kube_config() # try using local config
except config.config_exception.ConfigException:
except config.config_exception.ConfigException as e:
# load config from pod, if local config is not available
logging.debug('Failed loading local Kubernetes configuration,'
+ ' try from cluster')
logging.debug(e)
config.load_incluster_config()
coreApi = client.CoreV1Api()
@@ -120,8 +123,9 @@ def load_yaml(file_path):
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except:
print('Error opening file %s' % file_path)
except Exception as e:
logging.error('Error opening file %s' % file_path)
logging.error(e)
def load_yaml_files():
@@ -150,18 +154,31 @@ def start_workload_generator(wg_yaml):
num_sensors = args.dim_value
wl_max_records = 150000
# TODO: How is this calculation done?
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2
if args.uc_id == '2':
print('use uc2 stuff')
num_nested_groups = args.dim_value
num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int(
((approx_num_sensors + wl_max_records - 1) / wl_max_records)
)
# Customize workload generator creations
wg_yaml['spec']['replicas'] = wl_instances
# TODO: acces over name of container
# Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \
'-workload-generator:latest'
# TODO: acces over name of attribute
wg_containter['env'][1]['value'] = str(num_sensors)
wg_containter['env'][2]['value'] = str(wl_instances)
# Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances)
if args.uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups)
try:
wg_ss = appsApi.create_namespaced_deployment(
@@ -198,7 +215,7 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
print("Service '%s' created." % svc.metadata.name)
except client.rest.ApiException as e:
svc = svc_yaml
print("Service creation error: %s" % e.reason)
logging.error("Service creation error: %s" % e.reason)
# Create custom object service monitor
try:
@@ -212,7 +229,7 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except client.rest.ApiException as e:
svc_monitor = svc_monitor_yaml
print("ServiceMonitor creation error: %s" % e.reason)
logging.error("ServiceMonitor creation error: %s" % e.reason)
# Apply jmx config map for aggregation service
try:
@@ -221,7 +238,7 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except client.rest.ApiException as e:
jmx_cm = jmx_yaml
print("ConfigMap creation error: %s" % e.reason)
logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = args.instances
@@ -241,7 +258,7 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
print("Deployment '%s' created." % app_deploy.metadata.name)
except client.rest.ApiException as e:
app_deploy = deploy_yaml
print("Deployment creation error: %s" % e.reason)
logging.error("Deployment creation error: %s" % e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
@@ -277,7 +294,7 @@ def delete_resource(obj, del_func):
try:
del_func(obj['metadata']['name'], 'default')
except Exception as e:
print("Error deleting resource")
logging.error("Error deleting resource")
logging.error(e)
return
print('Resource deleted')
@@ -366,6 +383,7 @@ def delete_topics(topics):
break
return
def reset_zookeeper():
"""Delete ZooKeeper configurations used for workload generation.
"""
@@ -378,7 +396,8 @@ def reset_zookeeper():
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation'
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall '
+ '/workload-generation'
]
check_zoo_data_command = [
@@ -388,26 +407,33 @@ def reset_zookeeper():
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 ls /'
# "| awk -F[\]\[] '{print $2}'"
'zookeeper-shell my-confluent-cp-zookeeper:2181 get '
+ '/workload-generation'
]
output = subprocess.run(delete_zoo_data_command, capture_output=True, text=True)
logging.info(output.stdout)
# Wait for configuration deletion
while True:
output = subprocess.run(check_zoo_data_command, capture_output=True, text=True)
# Delete Zookeeper configuration data
output = subprocess.run(delete_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output.stdout)
# Check data is deleted
output = subprocess.run(check_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output)
if 'workload-generation' in output.stdout:
if output.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
print('ZooKeeper reset was not successful. Retrying in 5s.')
time.sleep(5)
else:
logging.info('ZooKeeper reset was successful.')
break
return
def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
@@ -442,6 +468,7 @@ def stop_lag_exporter():
# def stop():
#
def main():
load_variables()
print('---------------------')
Loading