Skip to content
Snippets Groups Projects
Commit 32d56dd0 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Merge branch 'feature/runUcPython' into feature/theodolitePython

parents b14fc0cd 20bac759
No related branches found
No related tags found
No related merge requests found
Showing
with 995 additions and 198 deletions
...@@ -5,17 +5,12 @@ from datetime import datetime, timedelta, timezone ...@@ -5,17 +5,12 @@ from datetime import datetime, timedelta, timezone
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import csv import csv
# import logging
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms) now = now_local - timedelta(milliseconds=time_diff_ms)
...@@ -28,13 +23,12 @@ start = now - timedelta(minutes=execution_minutes) ...@@ -28,13 +23,12 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z')) #print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z')) #print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
# 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
'step': '5s'}) 'step': '5s'})
# response # response
# print(response.request.path_url) # print(response.request.path_url)
# response.content # response.content
...@@ -47,7 +41,8 @@ for result in results: ...@@ -47,7 +41,8 @@ for result in results:
topic = result['metric']['topic'] topic = result['metric']['topic']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
...@@ -60,8 +55,10 @@ input = df.loc[df['topic'] == "input"] ...@@ -60,8 +55,10 @@ input = df.loc[df['topic'] == "input"]
from sklearn.linear_model import LinearRegression from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column X = input.iloc[:, 1].values.reshape(-1, 1)
# -1 means that calculate the dimension of rows, but have 1 column
Y = input.iloc[:, 2].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions Y_pred = linear_regressor.predict(X) # make predictions
...@@ -70,7 +67,8 @@ print(linear_regressor.coef_) ...@@ -70,7 +67,8 @@ print(linear_regressor.coef_)
# print(Y_pred) # print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_] fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields) print(fields)
with open(r'results.csv', 'a') as f: with open(r'results.csv', 'a') as f:
writer = csv.writer(f) writer = csv.writer(f)
...@@ -85,10 +83,9 @@ plt.savefig(f"{filename}_plot.png") ...@@ -85,10 +83,9 @@ plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv") df.to_csv(f"{filename}_values.csv")
# Load total lag count # Load total lag count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -103,7 +100,8 @@ for result in results: ...@@ -103,7 +100,8 @@ for result in results:
group = result['metric']['group'] group = result['metric']['group']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
...@@ -111,7 +109,7 @@ df.to_csv(f"{filename}_totallag.csv") ...@@ -111,7 +109,7 @@ df.to_csv(f"{filename}_totallag.csv")
# Load partition count # Load partition count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -126,16 +124,16 @@ for result in results: ...@@ -126,16 +124,16 @@ for result in results:
topic = result['metric']['topic'] topic = result['metric']['topic']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv") df.to_csv(f"{filename}_partitions.csv")
# Load instances count # Load instances count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -153,3 +151,16 @@ for result in results: ...@@ -153,3 +151,16 @@ for result in results:
df = pd.DataFrame(d) df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv") df.to_csv(f"{filename}_instances.csv")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Load arguments
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
...@@ -2,3 +2,7 @@ matplotlib==3.2.0 ...@@ -2,3 +2,7 @@ matplotlib==3.2.0
pandas==1.0.1 pandas==1.0.1
requests==2.23.0 requests==2.23.0
scikit-learn==0.22.2.post1 scikit-learn==0.22.2.post1
# For run_uc.py
kubernetes==11.0.0
confuse==1.1.0
import argparse # parse arguments from cli
import atexit # used to clear resources at exit of program (e.g. ctrl-c)
from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
import subprocess # execute bash commands
import sys # for exit of program
import time # process sleep
import yaml # convert from file to yaml object
coreApi = None # acces kubernetes core api
appsApi = None # acces kubernetes apps api
customApi = None # acces kubernetes custom object api
args = None # CLI arguments
def load_variables():
"""Load the CLI variables given at the command line"""
global args
print('Load CLI variables')
parser = argparse.ArgumentParser(description='Run use case Programm')
parser.add_argument('--exp-id', '-id',
dest='exp_id',
default='1',
metavar='EXP_ID',
help='ID of the experiment')
parser.add_argument('--use-case', '-uc',
dest='uc_id',
default='1',
metavar='UC_NUMBER',
help='use case number, one of 1, 2, 3 or 4')
parser.add_argument('--dim-value', '-d',
dest='dim_value',
default=10000,
type=int,
metavar='DIM_VALUE',
help='Value for the workload generator to be tested')
parser.add_argument('--instances', '-i',
dest='instances',
default=1,
type=int,
metavar='INSTANCES',
help='Numbers of instances to be benchmarked')
parser.add_argument('--partitions', '-p',
dest='partitions',
default=40,
type=int,
metavar='PARTITIONS',
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
dest='cpu_limit',
default='1000m',
metavar='CPU_LIMIT',
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
dest='memory_limit',
default='4Gi',
metavar='MEMORY_LIMIT',
help='Kubernetes memory limit')
parser.add_argument('--commit-interval', '-ci',
dest='commit_interval_ms',
default=100,
type=int,
metavar='KAFKA_STREAMS_COMMIT_INTERVAL_MS',
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--executions-minutes', '-exm',
dest='execution_minutes',
default=5,
type=int,
metavar='EXECUTION_MINUTES',
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--reset', '-res',
dest='reset',
action="store_true",
help='Resets the environment before execution')
parser.add_argument('--reset-only', '-reso',
dest='reset_only',
action="store_true",
help='Only resets the environment. Ignores all other parameters')
args = parser.parse_args()
print(args)
def initialize_kubernetes_api():
"""Load the kubernetes config from local or the cluster and creates
needed APIs.
"""
global coreApi, appsApi, customApi
print('Connect to kubernetes api')
try:
config.load_kube_config() # try using local config
except config.config_exception.ConfigException as e:
# load config from pod, if local config is not available
logging.debug('Failed loading local Kubernetes configuration,'
+ ' try from cluster')
logging.debug(e)
config.load_incluster_config()
coreApi = client.CoreV1Api()
appsApi = client.AppsV1Api()
customApi = client.CustomObjectsApi()
def create_topics(topics):
"""Create the topics needed for the use cases
:param topics: List of topics that should be created.
"""
# Calling exec and waiting for response
print('Create topics')
for (topic, partitions) in topics:
print('Create topic ' + topic + ' with #' + str(partitions)
+ ' partitions')
exec_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\
--create --topic {topic} --partitions {partitions}\
--replication-factor 1'
]
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
def load_yaml(file_path):
"""Creates a yaml file from the file at given path.
:param file_path: The path to the file which contains the yaml.
:return: The file as a yaml object.
"""
try:
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except Exception as e:
logging.error('Error opening file %s' % file_path)
logging.error(e)
def load_yaml_files():
"""Load the needed yaml files and creates objects from them.
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def start_workload_generator(wg_yaml):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:return:
The StatefulSet created by the API or in case it already exist/error
the yaml object.
"""
print('Start workload generator')
num_sensors = args.dim_value
wl_max_records = 150000
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2
if args.uc_id == '2':
print('use uc2 stuff')
num_nested_groups = args.dim_value
num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int(
((approx_num_sensors + wl_max_records - 1) / wl_max_records)
)
# Customize workload generator creations
wg_yaml['spec']['replicas'] = wl_instances
# TODO: acces over name of container
# Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + args.uc_id + \
'-workload-generator:latest'
# TODO: acces over name of attribute
# Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances)
if args.uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups)
try:
wg_ss = appsApi.create_namespaced_deployment(
namespace="default",
body=wg_yaml
)
print("Deployment '%s' created." % wg_ss.metadata.name)
return wg_ss
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml):
"""Applies the service, service monitor, jmx config map and start the
use case application.
:param svc_yaml: The yaml object for the service.
:param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned.
return svc, svc_monitor, jmx_cm, app_deploy
"""
print('Start use case application')
svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace="default", body=svc_yaml)
print("Service '%s' created." % svc.metadata.name)
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s" % e.reason)
# Create custom object service monitor
try:
svc_monitor = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors", # CustomResourceDef of ServiceMonitor
body=svc_monitor_yaml,
)
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except client.rest.ApiException as e:
svc_monitor = svc_monitor_yaml
logging.error("ServiceMonitor creation error: %s" % e.reason)
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace="default", body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except client.rest.ApiException as e:
jmx_cm = jmx_yaml
logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = args.instances
# TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'theodolite/theodolite-uc' + args.uc_id \
+ '-kstreams-app:latest'
# TODO: acces over name of attribute
app_container['env'][0]['value'] = str(args.commit_interval_ms)
app_container['resources']['limits']['memory'] = args.memory_limit
app_container['resources']['limits']['cpu'] = args.cpu_limit
try:
app_deploy = appsApi.create_namespaced_deployment(
namespace="default",
body=deploy_yaml
)
print("Deployment '%s' created." % app_deploy.metadata.name)
except client.rest.ApiException as e:
app_deploy = deploy_yaml
logging.error("Deployment creation error: %s" % e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution():
"""Wait time while in execution."""
print('Wait while executing')
# TODO: ask which fits better
# time.sleep(args.execution_minutes * 60)
for i in range(args.execution_minutes):
time.sleep(60)
print(f"Executed: {i+1} minutes")
print('Execution finished')
return
def run_evaluation():
"""Runs the evaluation function"""
print('Run evaluation function')
lag_analysis.main(args.exp_id, f'uc{args.uc_id}', args.dim_value, args.instances, args.execution_minutes)
return
def delete_resource(obj, del_func):
"""
Helper function to delete kuberentes resources.
First tries to delete with the kuberentes object.
Then it uses the dict representation of yaml to delete the object.
:param obj: Either kubernetes resource object or yaml as a dict.
:param del_func: The function that need to be executed for deletion
"""
try:
del_func(obj.metadata.name, 'default')
except Exception as e:
logging.debug('Error deleting resource with api object, try with dict.')
try:
del_func(obj['metadata']['name'], 'default')
except Exception as e:
logging.error("Error deleting resource")
logging.error(e)
return
print('Resource deleted')
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator')
print('Delete workload generator')
delete_resource(wg, appsApi.delete_namespaced_deployment)
print('Delete app service')
delete_resource(app_svc, coreApi.delete_namespaced_service)
print('Delete service monitor')
try:
customApi.delete_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors",
name=app_svc_monitor['metadata']['name'])
print('Resource deleted')
except Exception as e:
print("Error deleting service monitor")
print('Delete jmx config map')
delete_resource(app_jmx, coreApi.delete_namespaced_config_map)
print('Delete uc application')
delete_resource(app_deploy, appsApi.delete_namespaced_deployment)
return
def delete_topics(topics):
"""Delete topics from Kafka.
:param topics: List of topics to delete.
"""
print('Delete topics from Kafka')
topics_delete = 'theodolite-.*|' + '|'.join([ti[0] for ti in topics])
num_topics_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list \
| sed -n -E "/^({topics_delete})\
( - marked for deletion)?$/p" | wc -l'
]
topics_deletion_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete \
--topic "{topics_delete}"'
]
# Wait that topics get deleted
while True:
# topic deletion, sometimes a second deletion seems to be required
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=topics_deletion_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
print('Wait for topic deletion')
time.sleep(2)
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=num_topics_command,
stderr=True, stdin=False,
stdout=True, tty=False)
if resp == '0':
print("Topics deleted")
break
return
def reset_zookeeper():
"""Delete ZooKeeper configurations used for workload generation.
"""
print('Delete ZooKeeper configurations used for workload generation')
delete_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall '
+ '/workload-generation'
]
check_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 get '
+ '/workload-generation'
]
# Wait for configuration deletion
while True:
# Delete Zookeeper configuration data
output = subprocess.run(delete_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output.stdout)
# Check data is deleted
output = subprocess.run(check_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output)
if output.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
print('ZooKeeper reset was not successful. Retrying in 5s.')
time.sleep(5)
return
def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
next use cases.
"""
print('Stop the lag exporter')
find_pod_command = [
'kubectl',
'get',
'pod',
'-l',
'app.kubernetes.io/name=kafka-lag-exporter',
'-o',
'jsonpath="{.items[0].metadata.name}"'
]
output = subprocess.run(find_pod_command, capture_output=True, text=True)
lag_exporter_pod = output.stdout.replace('"', '')
delete_pod_command = [
'kubectl',
'delete',
'pod',
lag_exporter_pod
]
output = subprocess.run(delete_pod_command, capture_output=True, text=True)
print(output)
return
def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
"""
Stop the applications, delete topics, reset zookeeper and stop lag exporter.
"""
print('Reset cluster')
stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------')
delete_topics(topics)
print('---------------------')
reset_zookeeper()
print('---------------------')
stop_lag_exporter()
def main():
load_variables()
print('---------------------')
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', args.partitions),
('output', args.partitions),
('aggregation-feedback', args.partitions),
('configuration', 1)]
# Check for reset options
if args.reset_only:
# Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
sys.exit()
if args.reset:
# Reset cluster before execution
print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
print('---------------------')
# Register the reset operation so that is executed at the end of program
atexit.register(reset_cluster, wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics)
print('---------------------')
wg = start_workload_generator(wg)
print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
app_svc,
app_svc_monitor,
app_jmx,
app_deploy)
print('---------------------')
wait_execution()
print('---------------------')
run_evaluation()
print('---------------------')
# Cluster is resetted with atexit method
# reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
...@@ -29,38 +29,59 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,38 +29,59 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc1-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc1-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc1-application` cat <<EOF >uc-application/overlay/uc1-application/set_paramters.yaml
kubectl apply -f uc1-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc1-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc1-application/service-monitor.yaml metadata:
#kubectl apply -f uc1-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc1-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc1-application
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#kubectl delete -f uc1-workload-generator/deployment.yaml kubectl delete -k uc-workload-generator/overlay/uc1-workload-generator
#sed "s/{{INSTANCES}}/1/g" uc1-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-application/overlay/uc1-application
#sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc1-application/aggregation-service.yaml
kubectl delete -f uc1-application/jmx-configmap.yaml
kubectl delete -f uc1-application/service-monitor.yaml
#kubectl delete -f uc1-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
......
...@@ -30,36 +30,63 @@ WL_MAX_RECORDS=150000 ...@@ -30,36 +30,63 @@ WL_MAX_RECORDS=150000
APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS)) APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS))
WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc2-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc2-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "4"
- name: HIERARCHY
value: "full"
- name: NUM_NESTED_GROUPS
value: "$NUM_NESTED_GROUPS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc2-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc2-application` cat <<EOF >uc-application/overlay/uc2-application/set_paramters.yaml
kubectl apply -f uc2-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc2-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc2-application/service-monitor.yaml metadata:
#kubectl apply -f uc2-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc2-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc2-application
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#sed "s/{{INSTANCES}}/1/g" uc2-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-workload-generator/overlay/uc2-workload-generator
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - kubectl delete -k uc-application/overlay/uc2-application
kubectl delete -f uc2-application/aggregation-service.yaml
kubectl delete -f uc2-application/jmx-configmap.yaml
kubectl delete -f uc2-application/service-monitor.yaml
#kubectl delete -f uc2-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
......
...@@ -29,40 +29,61 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,40 +29,61 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc3-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc3-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc3-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc3-application` cat <<EOF >uc-application/overlay/uc3-application/set_paramters.yaml
kubectl apply -f uc3-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc3-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc3-application/service-monitor.yaml metadata:
#kubectl apply -f uc3-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc3-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc3-application
kubectl scale deployment uc3-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#kubectl delete -f uc3-workload-generator/deployment.yaml kubectl delete -k uc-workload-generator/overlay/uc3-workload-generator
#sed "s/{{INSTANCES}}/1/g" uc3-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-application/overlay/uc3-application
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc3-application/aggregation-service.yaml
kubectl delete -f uc3-application/jmx-configmap.yaml
kubectl delete -f uc3-application/service-monitor.yaml
#kubectl delete -f uc3-application/aggregation-deployment.yaml
#sed "s/{{CPU_LIMIT}}/1000m/g; s/{{MEMORY_LIMIT}}/4Gi/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/100/g" uc3-application/aggregation-deployment.yaml | kubectl delete -f -
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
...@@ -29,39 +29,60 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,39 +29,60 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc4-workload-generator/deployment.yaml) cat <<EOF >uuc-workload-generator/overlay/uc4-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc4-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
#AGGREGATION_DURATION_DAYS=$DIM_VALUE cat <<EOF >uc-application/overlay/uc4-application/set_paramters.yaml
# When not using `sed` anymore, use `kubectl apply -f uc4-application` apiVersion: apps/v1
kubectl apply -f uc4-application/aggregation-service.yaml kind: Deployment
kubectl apply -f uc4-application/jmx-configmap.yaml metadata:
kubectl apply -f uc4-application/service-monitor.yaml name: titan-ccp-aggregation
#kubectl apply -f uc4-application/aggregation-deployment.yaml spec:
#sed "s/{{AGGREGATION_DURATION_DAYS}}/$AGGREGATION_DURATION_DAYS/g" uc4-application/aggregation-deployment.yaml | kubectl apply -f - replicas: $REPLICAS
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc4-application/aggregation-deployment.yaml) template:
echo "$APPLICATION_YAML" | kubectl apply -f - spec:
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc4-application
kubectl scale deployment uc4-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#sed "s/{{INSTANCES}}/1/g" uc4-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-workload-generator/overlay/uc4-workload-generator
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - kubectl delete -k uc-application/overlay/uc4-application
kubectl delete -f uc4-application/aggregation-service.yaml
kubectl delete -f uc4-application/jmx-configmap.yaml
kubectl delete -f uc4-application/service-monitor.yaml
#kubectl delete -f uc4-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
...@@ -14,24 +14,24 @@ spec: ...@@ -14,24 +14,24 @@ spec:
spec: spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc1-application - name: uc-application
image: "theodolite/theodolite-uc1-kstreams-app:latest" image: uc-app:latest
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: COMMIT_INTERVAL_MS
value: "100"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL - name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081" value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
resources: resources:
limits: limits:
memory: "{{MEMORY_LIMIT}}" memory: 4Gi
cpu: "{{CPU_LIMIT}}" cpu: 1000m
- name: prometheus-jmx-exporter - name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143" image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command: command:
......
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
commonLabels:
app: titan-ccp-aggregation
# Use all resources to compose them into one file
resources:
- aggregation-deployment.yaml
- aggregation-service.yaml
- service-monitor.yaml
- jmx-configmap.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc1-
images:
- name: uc-app
newName: theodolite/theodolite-uc1-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc2-
images:
- name: uc-app
newName: theodolite/theodolite-uc2-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc3-
images:
- name: uc-app
newName: theodolite/theodolite-uc3-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc4-
images:
- name: uc-app
newName: theodolite/theodolite-uc4-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment