Skip to content
Snippets Groups Projects
Commit 365cdf58 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

add commit ms as default again for kubernetes deployment and overwrite existing env variables

parent d119e748
No related branches found
No related tags found
1 merge request!59Further Configuration Options for Kafka Streams
......@@ -41,8 +41,8 @@ def initialize_kubernetes_api():
config.load_kube_config() # try using local config
except config.config_exception.ConfigException as e:
# load config from pod, if local config is not available
logging.debug('Failed loading local Kubernetes configuration,'
+ ' try from cluster')
logging.debug(
'Failed loading local Kubernetes configuration try from cluster')
logging.debug(e)
config.load_incluster_config()
......@@ -58,8 +58,7 @@ def create_topics(topics):
# Calling exec and waiting for response
print('Create topics')
for (topic, partitions) in topics:
print('Create topic ' + topic + ' with #' + str(partitions)
+ ' partitions')
print(f'Create topic {topic} with #{partitions} partitions')
exec_command = [
'/bin/sh',
'-c',
......@@ -86,7 +85,7 @@ def load_yaml(file_path):
with f:
return yaml.safe_load(f)
except Exception as e:
logging.error('Error opening file %s' % file_path)
logging.error('Error opening file %s', file_path)
logging.error(e)
......@@ -105,6 +104,15 @@ def load_yaml_files():
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def replace_env_value(container, key, value):
"""
Special method to replace in a container with kubernetes env values
the value of a given parameter.
"""
next(filter(lambda x: x['name'] == key, container))[
'value'] = value
def start_workload_generator(wg_yaml, dim_value, uc_id):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
......@@ -139,22 +147,22 @@ def start_workload_generator(wg_yaml, dim_value, uc_id):
'-workload-generator:latest'
# Set environment variables
next(filter(lambda x: x['name'] == 'NUM_SENSORS', wg_containter['env']))[
'value'] = str(num_sensors)
next(filter(lambda x: x['name'] == 'INSTANCES', wg_containter['env']))[
'value'] = str(wl_instances)
replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors))
replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances))
if uc_id == '2': # Special configuration for uc2
next(filter(lambda x: x['name'] == 'NUM_NESTED_GROUPS', wg_containter['env']))[
'value'] = str(num_nested_groups)
replace_env_value(
wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups))
try:
wg_ss = appsApi.create_namespaced_deployment(
namespace=namespace,
body=wg_yaml
)
print("Deployment '%s' created." % wg_ss.metadata.name)
print(f'Deployment {wg_ss.metadata.name} created.')
return wg_ss
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
print(f'Deployment creation error: {e.reason}')
return wg_yaml
......@@ -185,10 +193,10 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
try:
svc = coreApi.create_namespaced_service(
namespace=namespace, body=svc_yaml)
print("Service '%s' created." % svc.metadata.name)
print(f'Service {svc.metadata.name} created.')
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s" % e.reason)
logging.error("Service creation error: %s", e.reason)
# Create custom object service monitor
try:
......@@ -199,31 +207,39 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
plural="servicemonitors", # CustomResourceDef of ServiceMonitor
body=svc_monitor_yaml,
)
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
print(f"ServiceMonitor '{svc_monitor['metadata']['name']}' created.")
except client.rest.ApiException as e:
svc_monitor = svc_monitor_yaml
logging.error("ServiceMonitor creation error: %s" % e.reason)
logging.error("ServiceMonitor creation error: %s", e.reason)
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace=namespace, body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
print(f"ConfigMap '{jmx_cm.metadata.name}' created.")
except client.rest.ApiException as e:
jmx_cm = jmx_yaml
logging.error("ConfigMap creation error: %s" % e.reason)
logging.error("ConfigMap creation error: %s", e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = instances
app_container = next(filter(
lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers']))
lambda x: x['name'] == 'uc-application',
deploy_yaml['spec']['template']['spec']['containers']))
app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest'
# Set configurations environment parameters for SPE
for k,v in configurations.items():
conf = {'name': k, 'value': v}
app_container['env'].append(conf)
for k, v in configurations.items():
# check if environment variable is already definde in yaml
env = next(filter(lambda x: x['name'] == k,
app_container['env']), None)
if env is not None:
env['value'] = v # replace value
else:
# create new environment pair
conf = {'name': k, 'value': v}
app_container['env'].append(conf)
# Set resources in Kubernetes
app_container['resources']['limits']['memory'] = memory_limit
......@@ -235,10 +251,10 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml,
namespace=namespace,
body=deploy_yaml
)
print("Deployment '%s' created." % app_deploy.metadata.name)
print(f"Deployment '{app_deploy.metadata.name}' created.")
except client.rest.ApiException as e:
app_deploy = deploy_yaml
logging.error("Deployment creation error: %s" % e.reason)
logging.error("Deployment creation error: %s", e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
......@@ -252,7 +268,7 @@ def wait_execution(execution_minutes):
for i in range(execution_minutes):
time.sleep(60)
print(f"Executed: {i+1} minutes")
print(f'Executed: {i+1} minutes')
print('Execution finished')
return
......@@ -267,7 +283,8 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path)
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances,
execution_minutes, prometheus_base_url, result_path)
return
......@@ -319,7 +336,7 @@ def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
name=app_svc_monitor['metadata']['name'])
print('Resource deleted')
except Exception as e:
print("Error deleting service monitor")
print('Error deleting service monitor')
print('Delete jmx config map')
delete_resource(app_jmx, coreApi.delete_namespaced_config_map)
......@@ -372,7 +389,7 @@ def delete_topics(topics):
stderr=True, stdin=False,
stdout=True, tty=False)
if resp == '0':
print("Topics deleted")
print('Topics deleted')
break
return
......
......@@ -26,6 +26,8 @@ spec:
value: "http://my-confluent-cp-schema-registry:8081"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: COMMIT_INTERVAL_MS # Set as default for the applications
value: "100"
resources:
limits:
memory: 4Gi
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment