Skip to content
Snippets Groups Projects
Commit ebb8230b authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/theodoliteKubernetes' into 'master'

Run theodolite benchmarking in Kubernetes

Closes #84

See merge request !45
parents bcdd5d63 61a852f7
No related branches found
No related tags found
1 merge request!45Run theodolite benchmarking in Kubernetes
Pipeline #1172 passed
*
!requirements.txt
!uc-workload-generator
!uc-application
!strategies
!lib
!theodolite.py
!run_uc.py
!lag_analysis.py
FROM python:3.8
RUN mkdir /app
WORKDIR /app
ADD requirements.txt /app/
RUN pip install -r requirements.txt
COPY uc-workload-generator /app/uc-workload-generator
COPY uc-application /app/uc-application
COPY strategies /app/strategies
COPY lib /app/lib
COPY lag_analysis.py /app/
COPY run_uc.py /app/
COPY theodolite.py /app/
CMD ["python", "/app/theodolite.py"]
import argparse
import os
def env_list_default(env, tf):
"""
Makes a list from an environment string.
"""
v = os.environ.get(env)
if v is not None:
v = [tf(s) for s in v.split(',')]
return v
def default_parser(description):
"""
......@@ -8,29 +18,30 @@ def default_parser(description):
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--uc',
metavar='<uc>',
default=os.environ.get('UC'),
help='[mandatory] use case number, one of 1, 2, 3 or 4')
parser.add_argument('--partitions', '-p',
default=40,
type=int,
metavar='<partitions>',
type=int,
default=os.environ.get('PARTITIONS', 40),
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
default='1000m',
metavar='<CPU limit>',
default=os.environ.get('CPU_LIMIT', '1000m'),
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
default='4Gi',
metavar='<memory limit>',
default=os.environ.get('MEMORY_LIMIT', '4Gi'),
help='Kubernetes memory limit')
parser.add_argument('--commit-ms',
default=100,
type=int,
metavar='<commit ms>',
type=int,
default=os.environ.get('COMMIT_MS', 100),
help='Kafka Streams commit interval in milliseconds')
parser.add_argument('--duration', '-d',
default=5,
type=int,
metavar='<duration>',
type=int,
default=os.environ.get('DURATION', 5),
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--reset',
......@@ -39,6 +50,10 @@ def default_parser(description):
parser.add_argument('--reset-only',
action="store_true",
help='Only resets the environment. Ignores all other parameters')
parser.add_argument('--prometheus',
metavar='<URL>',
default=os.environ.get('PROMETHEUS_BASE_URL'),
help='Defines where to find the prometheus instance')
return parser
def benchmark_parser(description):
......@@ -49,22 +64,24 @@ def benchmark_parser(description):
parser = default_parser(description)
parser.add_argument('--loads',
type=int,
metavar='<load>',
type=int,
nargs='+',
default=env_list_default('LOADS', int),
help='[mandatory] Loads that should be executed')
parser.add_argument('--instances', '-i',
dest='instances_list',
type=int,
metavar='<instances>',
type=int,
nargs='+',
default=env_list_default('INSTANCES', int),
help='[mandatory] List of instances used in benchmarks')
parser.add_argument('--domain-restriction',
action="store_true",
help='To use domain restriction. For details see README')
parser.add_argument('--search-strategy',
default='default',
metavar='<strategy>',
default=os.environ.get('SEARCH_STRATEGY', 'default'),
help='The benchmarking search strategy. Can be set to default, linear-search or binary-search')
return parser
......@@ -76,13 +93,16 @@ def execution_parser(description):
parser = default_parser(description)
parser.add_argument('--exp-id',
metavar='<exp id>',
default=os.environ.get('EXP_ID'),
help='[mandatory] ID of the experiment')
parser.add_argument('--load',
type=int,
metavar='<load>',
type=int,
default=os.environ.get('LOAD'),
help='[mandatory] Load that should be used for benchmakr')
parser.add_argument('--instances',
type=int,
metavar='<instances>',
type=int,
default=os.environ.get('INSTANCES'),
help='[mandatory] Numbers of instances to be benchmarked')
return parser
......@@ -4,8 +4,8 @@ from kubernetes import client, config # kubernetes api
from kubernetes.stream import stream
import lag_analysis
import logging # logging
from os import path # path utilities
from strategies.cli_parser import execution_parser
from os import path, environ # path utilities
from lib.cli_parser import execution_parser
import subprocess # execute bash commands
import sys # for exit of program
import time # process sleep
......@@ -244,7 +244,7 @@ def wait_execution(execution_minutes):
return
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
......@@ -254,7 +254,12 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes):
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
elif prometheus_base_url is not None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url)
else:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL'))
return
......@@ -370,22 +375,14 @@ def reset_zookeeper():
print('Delete ZooKeeper configurations used for workload generation')
delete_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'/bin/sh',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall '
+ '/workload-generation'
]
check_zoo_data_command = [
'kubectl',
'exec',
'zookeeper-client',
'--',
'bash',
'/bin/sh',
'-c',
'zookeeper-shell my-confluent-cp-zookeeper:2181 get '
+ '/workload-generation'
......@@ -394,18 +391,25 @@ def reset_zookeeper():
# Wait for configuration deletion
while True:
# Delete Zookeeper configuration data
output = subprocess.run(delete_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output.stdout)
resp = stream(coreApi.connect_get_namespaced_pod_exec,
"zookeeper-client",
'default',
command=delete_zoo_data_command,
stderr=True, stdin=False,
stdout=True, tty=False)
logging.debug(resp)
# Check data is deleted
output = subprocess.run(check_zoo_data_command,
capture_output=True,
text=True)
logging.debug(output)
client = stream(coreApi.connect_get_namespaced_pod_exec,
"zookeeper-client",
'default',
command=check_zoo_data_command,
stderr=True, stdin=False,
stdout=True, tty=False,
_preload_content=False) # Get client for returncode
client.run_forever(timeout=60) # Start the client
if output.returncode == 1: # Means data not available anymore
if client.returncode == 1: # Means data not available anymore
print('ZooKeeper reset was successful.')
break
else:
......@@ -450,7 +454,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, reset, reset_only):
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url=None, reset=False, reset_only=False):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
......@@ -515,7 +519,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
wait_execution(execution_minutes)
print('---------------------')
run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes)
run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url)
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
......@@ -529,5 +533,5 @@ if __name__ == '__main__':
print('---------------------')
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_ms, args.duration, args.reset,
args.commit_ms, args.duration, args.prometheus, args.reset,
args.reset_only)
#!/usr/bin/env python
import argparse
from lib.cli_parser import benchmark_parser
import logging # logging
import os
import sys
from strategies.config import ExperimentConfig
from strategies.cli_parser import benchmark_parser
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy
import strategies.strategies.search.check_all_strategy as check_all_strategy
......
apiVersion: batch/v1
kind: Job
metadata:
name: theodolite
spec:
template:
spec:
containers:
- name: theodolite
image: bvonheid/theodolite:latest
# imagePullPolicy: Never # Used to pull "own" local image
env:
- name: UC
value: "1"
- name: LOADS
value: "13206, 19635"
- name: INSTANCES
value: "1, 2"
- name: DURATION
value: "3"
- name: PARTITIONS
value: "30"
# - name: COMMIT_MS
# value: ""
# - name: SEARCH_STRATEGY
# value: ""
# - name: CPU_LIMIT
# value: ""
# - name: MEMORY_LIMIT
# value: ""
- name: PROMETHEUS_BASE_URL
value: "http://prometheus-operated:9090"
- name: PYTHONUNBUFFERED
value: "1"
restartPolicy: Never
backoffLimit: 4
# ---
# apiVersion: v1
# kind: ServiceAccount
# metadata:
# name: theodolite
# ---
# apiVersion: rbac.authorization.k8s.io/v1
# kind: Role
# metadata:
# name: modify-pods
# rules:
# - apiGroups: [""]
# resources:
# - pods
# verbs:
# - get
# - list
# - delete
# ---
# apiVersion: rbac.authorization.k8s.io/v1
# kind: RoleBinding
# metadata:
# name: modify-pods-to-sa
# subjects:
# - kind: ServiceAccount
# name: theodolite
# roleRef:
# kind: Role
# name: modify-pods
# apiGroup: rbac.authorization.k8s.io
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment