Skip to content
Snippets Groups Projects
Commit 6c845743 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/theodolitePython' into 'master'

Integerate theodolite and run uc python scripts

See merge request !42
parents 3ca76330 2312a25f
Branches
Tags
1 merge request!42Integerate theodolite and run uc python scripts
Pipeline #1121 passed
Showing
with 1035 additions and 201 deletions
......@@ -5,17 +5,12 @@ from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
#
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
import logging
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms)
......@@ -28,13 +23,12 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get(prometheus_query_path, params={
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
# 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
# response
# print(response.request.path_url)
# response.content
......@@ -47,7 +41,8 @@ for result in results:
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
......@@ -60,8 +55,10 @@ input = df.loc[df['topic'] == "input"]
from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
# values converts it into a numpy array
X = input.iloc[:, 1].values.reshape(-1, 1)
# -1 means that calculate the dimension of rows, but have 1 column
Y = input.iloc[:, 2].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
......@@ -70,7 +67,8 @@ print(linear_regressor.coef_)
# print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_]
fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields)
with open(r'results.csv', 'a') as f:
writer = csv.writer(f)
......@@ -85,10 +83,9 @@ plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv")
# Load total lag count
response = requests.get(prometheus_query_path, params={
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
......@@ -103,7 +100,8 @@ for result in results:
group = result['metric']['group']
for value in result['values']:
# print(value)
d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
......@@ -111,7 +109,7 @@ df.to_csv(f"{filename}_totallag.csv")
# Load partition count
response = requests.get(prometheus_query_path, params={
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
......@@ -126,16 +124,16 @@ for result in results:
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv")
# Load instances count
response = requests.get(prometheus_query_path, params={
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
......@@ -153,3 +151,16 @@ for result in results:
df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Load arguments
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
......@@ -2,3 +2,7 @@ matplotlib==3.2.0
pandas==1.0.1
requests==2.23.0
scikit-learn==0.22.2.post1
# For run_uc.py
kubernetes==11.0.0
confuse==1.1.0
import argparse # parse arguments from cli
import atexit # used to clear resources at exit of program (e.g. ctrl-c)
from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
import subprocess # execute bash commands
import sys # for exit of program
import time # process sleep
import yaml # convert from file to yaml object
coreApi = None # acces kubernetes core api
appsApi = None # acces kubernetes apps api
customApi = None # acces kubernetes custom object api
def load_variables():
"""Load the CLI variables given at the command line"""
global args
print('Load CLI variables')
parser = argparse.ArgumentParser(description='Run use case Programm')
parser.add_argument('--exp-id', '-id',
dest='exp_id',
default='1',
metavar='EXP_ID',
help='ID of the experiment')
parser.add_argument('--use-case', '-uc',
dest='uc_id',
default='1',
metavar='UC_NUMBER',
help='use case number, one of 1, 2, 3 or 4')
parser.add_argument('--dim-value', '-d',
dest='dim_value',
default=10000,
type=int,
metavar='DIM_VALUE',
help='Value for the workload generator to be tested')
parser.add_argument('--instances', '-i',
dest='instances',
default=1,
type=int,
metavar='INSTANCES',
help='Numbers of instances to be benchmarked')
parser.add_argument('--partitions', '-p',
dest='partitions',
default=40,
type=int,
metavar='PARTITIONS',
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
dest='cpu_limit',
default='1000m',
metavar='CPU_LIMIT',
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
dest='memory_limit',
default='4Gi',
metavar='MEMORY_LIMIT',
help='Kubernetes memory limit')
parser.add_argument('--commit-interval', '-ci',
dest='commit_interval_ms',
default=100,
type=int,
metavar='KAFKA_STREAMS_COMMIT_INTERVAL_MS',
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--executions-minutes', '-exm',
dest='execution_minutes',
default=5,
type=int,
metavar='EXECUTION_MINUTES',
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--reset', '-res',
dest='reset',
action="store_true",
help='Resets the environment before execution')
parser.add_argument('--reset-only', '-reso',
dest='reset_only',
action="store_true",
help='Only resets the environment. Ignores all other parameters')
args = parser.parse_args()
print(args)
return args
def initialize_kubernetes_api():
"""Load the kubernetes config from local or the cluster and creates
needed APIs.
"""
global coreApi, appsApi, customApi
print('Connect to kubernetes api')
try:
config.load_kube_config() # try using local config
except config.config_exception.ConfigException as e:
# load config from pod, if local config is not available
logging.debug('Failed loading local Kubernetes configuration,'
+ ' try from cluster')
logging.debug(e)
config.load_incluster_config()
coreApi = client.CoreV1Api()
appsApi = client.AppsV1Api()
customApi = client.CustomObjectsApi()
def create_topics(topics):
"""Create the topics needed for the use cases
:param topics: List of topics that should be created.
"""
# Calling exec and waiting for response
print('Create topics')
for (topic, partitions) in topics:
print('Create topic ' + topic + ' with #' + str(partitions)
+ ' partitions')
exec_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181\
--create --topic {topic} --partitions {partitions}\
--replication-factor 1'
]
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
def load_yaml(file_path):
"""Creates a yaml file from the file at given path.
:param file_path: The path to the file which contains the yaml.
:return: The file as a yaml object.
"""
try:
f = open(path.join(path.dirname(__file__), file_path))
with f:
return yaml.safe_load(f)
except Exception as e:
logging.error('Error opening file %s' % file_path)
logging.error(e)
def load_yaml_files():
"""Load the needed yaml files and creates objects from them.
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
def start_workload_generator(wg_yaml, dim_value, uc_id):
"""Starts the workload generator.
:param wg_yaml: The yaml object for the workload generator.
:param string dim_value: The dimension value the load generator should use.
:param string uc_id: Use case id for which load should be generated.
:return:
The StatefulSet created by the API or in case it already exist/error
the yaml object.
"""
print('Start workload generator')
num_sensors = dim_value
wl_max_records = 150000
wl_instances = int(((num_sensors + (wl_max_records - 1)) / wl_max_records))
# set parameters special for uc 2
if uc_id == '2':
print('use uc2 stuff')
num_nested_groups = dim_value
num_sensors = '4'
approx_num_sensors = int(num_sensors) ** num_nested_groups
wl_instances = int(
((approx_num_sensors + wl_max_records - 1) / wl_max_records)
)
# Customize workload generator creations
wg_yaml['spec']['replicas'] = wl_instances
# TODO: acces over name of container
# Set used use case
wg_containter = wg_yaml['spec']['template']['spec']['containers'][0]
wg_containter['image'] = 'theodolite/theodolite-uc' + uc_id + \
'-workload-generator:latest'
# TODO: acces over name of attribute
# Set environment variables
wg_containter['env'][0]['value'] = str(num_sensors)
wg_containter['env'][1]['value'] = str(wl_instances)
if uc_id == '2': # Special configuration for uc2
wg_containter['env'][2]['value'] = str(num_nested_groups)
try:
wg_ss = appsApi.create_namespaced_deployment(
namespace="default",
body=wg_yaml
)
print("Deployment '%s' created." % wg_ss.metadata.name)
return wg_ss
except client.rest.ApiException as e:
print("Deployment creation error: %s" % e.reason)
return wg_yaml
def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit):
"""Applies the service, service monitor, jmx config map and start the
use case application.
:param svc_yaml: The yaml object for the service.
:param svc_monitor_yaml: The yaml object for the service monitor.
:param jmx_yaml: The yaml object for the jmx config map.
:param deploy_yaml: The yaml object for the application.
:param int instances: Number of instances for use case application.
:param string uc_id: The id of the use case to execute.
:param int commit_interval_ms: The commit interval in ms.
:param string memory_limit: The memory limit for the application.
:param string cpu_limit: The CPU limit for the application.
:return:
The Service, ServiceMonitor, JMX ConfigMap and Deployment.
In case the resource already exist/error the yaml object is returned.
return svc, svc_monitor, jmx_cm, app_deploy
"""
print('Start use case application')
svc, svc_monitor, jmx_cm, app_deploy = None, None, None, None
# Create Service
try:
svc = coreApi.create_namespaced_service(
namespace="default", body=svc_yaml)
print("Service '%s' created." % svc.metadata.name)
except client.rest.ApiException as e:
svc = svc_yaml
logging.error("Service creation error: %s" % e.reason)
# Create custom object service monitor
try:
svc_monitor = customApi.create_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors", # CustomResourceDef of ServiceMonitor
body=svc_monitor_yaml,
)
print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name'])
except client.rest.ApiException as e:
svc_monitor = svc_monitor_yaml
logging.error("ServiceMonitor creation error: %s" % e.reason)
# Apply jmx config map for aggregation service
try:
jmx_cm = coreApi.create_namespaced_config_map(
namespace="default", body=jmx_yaml)
print("ConfigMap '%s' created." % jmx_cm.metadata.name)
except client.rest.ApiException as e:
jmx_cm = jmx_yaml
logging.error("ConfigMap creation error: %s" % e.reason)
# Create deployment
deploy_yaml['spec']['replicas'] = instances
# TODO: acces over name of container
app_container = deploy_yaml['spec']['template']['spec']['containers'][0]
app_container['image'] = 'theodolite/theodolite-uc' + uc_id \
+ '-kstreams-app:latest'
# TODO: acces over name of attribute
app_container['env'][0]['value'] = str(commit_interval_ms)
app_container['resources']['limits']['memory'] = memory_limit
app_container['resources']['limits']['cpu'] = cpu_limit
try:
app_deploy = appsApi.create_namespaced_deployment(
namespace="default",
body=deploy_yaml
)
print("Deployment '%s' created." % app_deploy.metadata.name)
except client.rest.ApiException as e:
app_deploy = deploy_yaml
logging.error("Deployment creation error: %s" % e.reason)
return svc, svc_monitor, jmx_cm, app_deploy
def wait_execution(execution_minutes):
"""
Wait time while in execution.
:param int execution_minutes: The duration to wait for execution.
"""
print('Wait while executing')
for i in range(execution_minutes):
time.sleep(60)
print(f"Executed: {i+1} minutes")
print('Execution finished')
return
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
:param string uc_id: ID of the executed use case.
:param int dim_value: The dimension value used for execution.
:param int instances: The number of instances used for the execution.
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
return
def delete_resource(obj, del_func):
"""
Helper function to delete kuberentes resources.
First tries to delete with the kuberentes object.
Then it uses the dict representation of yaml to delete the object.
:param obj: Either kubernetes resource object or yaml as a dict.
:param del_func: The function that need to be executed for deletion
"""
try:
del_func(obj.metadata.name, 'default')
except Exception as e:
logging.debug(
'Error deleting resource with api object, try with dict.')
try:
del_func(obj['metadata']['name'], 'default')
except Exception as e:
logging.error("Error deleting resource")
logging.error(e)
return
print('Resource deleted')
def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy):
"""Stops the applied applications and delete resources.
:param wg: The workload generator statefull set.
:param app_svc: The application service.
:param app_svc_monitor: The application service monitor.
:param app_jmx: The application jmx config map.
:param app_deploy: The application deployment.
"""
print('Stop use case application and workload generator')
print('Delete workload generator')
delete_resource(wg, appsApi.delete_namespaced_deployment)
print('Delete app service')
delete_resource(app_svc, coreApi.delete_namespaced_service)
print('Delete service monitor')
try:
customApi.delete_namespaced_custom_object(
group="monitoring.coreos.com",
version="v1",
namespace="default",
plural="servicemonitors",
name=app_svc_monitor['metadata']['name'])
print('Resource deleted')
except Exception as e:
print("Error deleting service monitor")
print('Delete jmx config map')
delete_resource(app_jmx, coreApi.delete_namespaced_config_map)
print('Delete uc application')
delete_resource(app_deploy, appsApi.delete_namespaced_deployment)
return
def delete_topics(topics):
"""Delete topics from Kafka.
:param topics: List of topics to delete.
"""
print('Delete topics from Kafka')
topics_delete = 'theodolite-.*|' + '|'.join([ti[0] for ti in topics])
num_topics_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list \
| sed -n -E "/^({topics_delete})\
( - marked for deletion)?$/p" | wc -l'
]
topics_deletion_command = [
'/bin/sh',
'-c',
f'kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete \
--topic "{topics_delete}"'
]
# Wait that topics get deleted
while True:
# topic deletion, sometimes a second deletion seems to be required
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=topics_deletion_command,
stderr=True, stdin=False,
stdout=True, tty=False)
print(resp)
print('Wait for topic deletion')
time.sleep(2)
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"kafka-client",
'default',
command=num_topics_command,
stderr=True, stdin=False,
stdout=True, tty=False)
if resp == '0':
print("Topics deleted")
break
return
def reset_zookeeper():
"""Delete ZooKeeper configurations used for workload generation.
"""
print('Delete ZooKeeper configurations used for workload generation')
delete_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall '
+ '/workload-generation'
]
check_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 get '
+ '/workload-generation'
]
# Wait for configuration deletion
while True:
# Delete Zookeeper configuration data
output = subprocess.run(delete_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output.stdout)
# Check data is deleted
output = subprocess.run(check_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output)
if output.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
print('ZooKeeper reset was not successful. Retrying in 5s.')
time.sleep(5)
return
def stop_lag_exporter():
"""
Stop the lag exporter in order to reset it and allow smooth execution for
next use cases.
"""
print('Stop the lag exporter')
find_pod_command = [
'kubectl',
'get',
'pod',
'-l',
'app.kubernetes.io/name=kafka-lag-exporter',
'-o',
'jsonpath="{.items[0].metadata.name}"'
]
output = subprocess.run(find_pod_command, capture_output=True, text=True)
lag_exporter_pod = output.stdout.replace('"', '')
delete_pod_command = [
'kubectl',
'delete',
'pod',
lag_exporter_pod
]
output = subprocess.run(delete_pod_command, capture_output=True, text=True)
print(output)
return
def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
"""
Stop the applications, delete topics, reset zookeeper and stop lag exporter.
"""
print('Reset cluster')
stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy)
print('---------------------')
delete_topics(topics)
print('---------------------')
reset_zookeeper()
print('---------------------')
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
:param string exp_id: The number of executed experiment
:param string uc_id: Use case to execute
:param int dim_value: Dimension value for load generator.
:param int instances: Number of instances for application.
:param int partitions: Number of partitions the kafka topics should have.
:param string cpu_limit: Max CPU utilazation for application.
:param string memory_limit: Max memory utilazation for application.
:param int commit_interval_ms: Kafka Streams commit interval in milliseconds
:param int execution_minutes: How long to execute the benchmark.
:param boolean reset: Flag for reset of cluster before execution.
:param boolean reset_only: Flag to only reset the application.
"""
wg, app_svc, app_svc_monitor, app_jmx, app_deploy = load_yaml_files()
print('---------------------')
initialize_kubernetes_api()
print('---------------------')
topics = [('input', partitions),
('output', partitions),
('aggregation-feedback', partitions),
('configuration', 1)]
# Check for reset options
if reset_only:
# Only reset cluster an then end program
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
sys.exit()
if reset:
# Reset cluster before execution
print('Reset only mode')
reset_cluster(wg, app_svc, app_svc_monitor,
app_jmx, app_deploy, topics)
print('---------------------')
# Register the reset operation so that is executed at the abort of program
atexit.register(reset_cluster, wg, app_svc,
app_svc_monitor, app_jmx, app_deploy, topics)
create_topics(topics)
print('---------------------')
wg = start_workload_generator(wg, dim_value, uc_id)
print('---------------------')
app_svc, app_svc_monitor, app_jmx, app_deploy = start_application(
app_svc,
app_svc_monitor,
app_jmx,
app_deploy,
instances,
uc_id,
commit_interval_ms,
memory_limit,
cpu_limit)
print('---------------------')
wait_execution(execution_minutes)
print('---------------------')
run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes)
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics)
atexit.unregister(reset_cluster)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
args = load_variables()
print('---------------------')
main(args.exp_id, args.uc_id, args.dim_value, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_interval_ms, args.execution_minutes, args.reset,
args.reset_only)
......@@ -29,38 +29,59 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
cat <<EOF >uc-workload-generator/overlay/uc1-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc1-workload-generator
# Start application
REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc1-application`
kubectl apply -f uc1-application/aggregation-service.yaml
kubectl apply -f uc1-application/jmx-configmap.yaml
kubectl apply -f uc1-application/service-monitor.yaml
#kubectl apply -f uc1-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc1-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
cat <<EOF >uc-application/overlay/uc1-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc1-application
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop wl and app
#kubectl delete -f uc1-workload-generator/deployment.yaml
#sed "s/{{INSTANCES}}/1/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
#sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc1-application/aggregation-service.yaml
kubectl delete -f uc1-application/jmx-configmap.yaml
kubectl delete -f uc1-application/service-monitor.yaml
#kubectl delete -f uc1-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc1-workload-generator
kubectl delete -k uc-application/overlay/uc1-application
# Delete topics instead of Kafka
......
......@@ -30,36 +30,63 @@ WL_MAX_RECORDS=150000
APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS))
WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc2-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
cat <<EOF >uc-workload-generator/overlay/uc2-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "4"
- name: HIERARCHY
value: "full"
- name: NUM_NESTED_GROUPS
value: "$NUM_NESTED_GROUPS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc2-workload-generator
# Start application
REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc2-application`
kubectl apply -f uc2-application/aggregation-service.yaml
kubectl apply -f uc2-application/jmx-configmap.yaml
kubectl apply -f uc2-application/service-monitor.yaml
#kubectl apply -f uc2-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc2-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
cat <<EOF >uc-application/overlay/uc2-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc2-application
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop wl and app
#sed "s/{{INSTANCES}}/1/g" uc2-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc2-application/aggregation-service.yaml
kubectl delete -f uc2-application/jmx-configmap.yaml
kubectl delete -f uc2-application/service-monitor.yaml
#kubectl delete -f uc2-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc2-workload-generator
kubectl delete -k uc-application/overlay/uc2-application
# Delete topics instead of Kafka
......
......@@ -29,40 +29,61 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc3-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
cat <<EOF >uc-workload-generator/overlay/uc3-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc3-workload-generator
# Start application
REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc3-application`
kubectl apply -f uc3-application/aggregation-service.yaml
kubectl apply -f uc3-application/jmx-configmap.yaml
kubectl apply -f uc3-application/service-monitor.yaml
#kubectl apply -f uc3-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc3-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
cat <<EOF >uc-application/overlay/uc3-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc3-application
kubectl scale deployment uc3-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop wl and app
#kubectl delete -f uc3-workload-generator/deployment.yaml
#sed "s/{{INSTANCES}}/1/g" uc3-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc3-application/aggregation-service.yaml
kubectl delete -f uc3-application/jmx-configmap.yaml
kubectl delete -f uc3-application/service-monitor.yaml
#kubectl delete -f uc3-application/aggregation-deployment.yaml
#sed "s/{{CPU_LIMIT}}/1000m/g; s/{{MEMORY_LIMIT}}/4Gi/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/100/g" uc3-application/aggregation-deployment.yaml | kubectl delete -f -
echo "$APPLICATION_YAML" | kubectl delete -f -
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc3-workload-generator
kubectl delete -k uc-application/overlay/uc3-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
......@@ -29,39 +29,60 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc4-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
cat <<EOF >uuc-workload-generator/overlay/uc4-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc4-workload-generator
# Start application
REPLICAS=$INSTANCES
#AGGREGATION_DURATION_DAYS=$DIM_VALUE
# When not using `sed` anymore, use `kubectl apply -f uc4-application`
kubectl apply -f uc4-application/aggregation-service.yaml
kubectl apply -f uc4-application/jmx-configmap.yaml
kubectl apply -f uc4-application/service-monitor.yaml
#kubectl apply -f uc4-application/aggregation-deployment.yaml
#sed "s/{{AGGREGATION_DURATION_DAYS}}/$AGGREGATION_DURATION_DAYS/g" uc4-application/aggregation-deployment.yaml | kubectl apply -f -
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc4-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
cat <<EOF >uc-application/overlay/uc4-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc4-application
kubectl scale deployment uc4-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop wl and app
#sed "s/{{INSTANCES}}/1/g" uc4-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc4-application/aggregation-service.yaml
kubectl delete -f uc4-application/jmx-configmap.yaml
kubectl delete -f uc4-application/service-monitor.yaml
#kubectl delete -f uc4-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc4-workload-generator
kubectl delete -k uc-application/overlay/uc4-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
# Wrapper that makes the execution method of a subexperiment interchangable.
import os
import run_uc
dirname = os.path.dirname(__file__)
os.chdir(dirname+"/../../")
def execute(subexperiment_config):
os.system(f"./run_uc{subexperiment_config.use_case}.sh {subexperiment_config.exp_id} {subexperiment_config.dim_value} {subexperiment_config.replicas} {subexperiment_config.partitions} {subexperiment_config.cpu_limit} {subexperiment_config.memory_limit} {subexperiment_config.kafka_streams_commit_interval_ms} {subexperiment_config.execution_minutes}")
\ No newline at end of file
run_uc.main(
exp_id=subexperiment_config.exp_id,
uc_id=subexperiment_config.use_case,
dim_value=int(subexperiment_config.dim_value),
instances=int(subexperiment_config.replicas),
partitions=subexperiment_config.partitions,
cpu_limit=subexperiment_config.cpu_limit,
memory_limit=subexperiment_config.memory_limit,
commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
execution_minutes=int(subexperiment_config.execution_minutes),
reset=False,
reset_only=False)
......@@ -14,24 +14,24 @@ spec:
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc1-application
image: "theodolite/theodolite-uc1-kstreams-app:latest"
- name: uc-application
image: uc-app:latest
ports:
- containerPort: 5555
name: jmx
env:
- name: COMMIT_INTERVAL_MS
value: "100"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
resources:
limits:
memory: "{{MEMORY_LIMIT}}"
cpu: "{{CPU_LIMIT}}"
memory: 4Gi
cpu: 1000m
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
......
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
commonLabels:
app: titan-ccp-aggregation
# Use all resources to compose them into one file
resources:
- aggregation-deployment.yaml
- aggregation-service.yaml
- service-monitor.yaml
- jmx-configmap.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc1-
images:
- name: uc-app
newName: theodolite/theodolite-uc1-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc2-
images:
- name: uc-app
newName: theodolite/theodolite-uc2-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc3-
images:
- name: uc-app
newName: theodolite/theodolite-uc3-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc4-
images:
- name: uc-app
newName: theodolite/theodolite-uc4-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment