Skip to content
Snippets Groups Projects
Commit fbee7101 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '160-Remove-previous-Python-implementation' into 'theodolite-kotlin'

Remove-previous-Python-implementation

See merge request !110
parents ed30aca0 c8145cd3
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!110Remove-previous-Python-implementation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3785 passed
Showing
with 0 additions and 1210 deletions
FROM python:3.8
RUN mkdir /app
WORKDIR /app
ADD requirements.txt /app/
RUN pip install -r requirements.txt
COPY uc-workload-generator /app/uc-workload-generator
COPY uc-application /app/uc-application
COPY strategies /app/strategies
COPY lib /app/lib
COPY lag_analysis.py /app/
COPY run_uc.py /app/
COPY theodolite.py /app/
CMD ["python", "/app/theodolite.py"]
import sys
import os
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
import logging
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms)
print(f"Now Local: {now_local}")
print(f"Now Used: {now}")
end = now
start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
# 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
# response
# print(response.request.path_url)
# response.content
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
# Do some analysis
input = df.loc[df['topic'] == "input"]
# input.plot(kind='line',x='timestamp',y='value',color='red')
# plt.show()
from sklearn.linear_model import LinearRegression
# values converts it into a numpy array
X = input.iloc[:, 1].values.reshape(-1, 1)
# -1 means that calculate the dimension of rows, but have 1 column
Y = input.iloc[:, 2].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
print(linear_regressor.coef_)
# print(Y_pred)
fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields)
with open(f'{result_path}/results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv")
# Load total lag count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
group = result['metric']['group']
for value in result['values']:
# print(value)
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv")
# Load partition count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv")
# Load instances count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
for value in result['values']:
# print(value)
d.append({'timestamp': int(value[0]), 'value': int(value[1])})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Load arguments
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes,
'http://localhost:9090', 'results')
import argparse
import os
def env_list_default(env, tf):
"""
Makes a list from an environment string.
"""
v = os.environ.get(env)
if v is not None:
v = [tf(s) for s in v.split(',')]
return v
def key_values_to_dict(kvs):
"""
Given a list with key values in form `Key=Value` it creates a dict from it.
"""
my_dict = {}
for kv in kvs:
k, v = kv.split("=")
my_dict[k] = v
return my_dict
def env_dict_default(env):
"""
Makes a dict from an environment string.
"""
v = os.environ.get(env)
if v is not None:
return key_values_to_dict(v.split(','))
else:
return dict()
class StoreDictKeyPair(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
self._nargs = nargs
super(StoreDictKeyPair, self).__init__(
option_strings, dest, nargs=nargs, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
my_dict = key_values_to_dict(values)
setattr(namespace, self.dest, my_dict)
def default_parser(description):
"""
Returns the default parser that can be used for thodolite and run uc py
:param description: The description the argument parser should show.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--uc',
metavar='<uc>',
default=os.environ.get('UC'),
help='[mandatory] use case number, one of 1, 2, 3 or 4')
parser.add_argument('--partitions', '-p',
metavar='<partitions>',
type=int,
default=os.environ.get('PARTITIONS', 40),
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
metavar='<CPU limit>',
default=os.environ.get('CPU_LIMIT', '1000m'),
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
metavar='<memory limit>',
default=os.environ.get('MEMORY_LIMIT', '4Gi'),
help='Kubernetes memory limit')
parser.add_argument('--duration', '-d',
metavar='<duration>',
type=int,
default=os.environ.get('DURATION', 5),
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--namespace',
metavar='<NS>',
default=os.environ.get('NAMESPACE', 'default'),
help='Defines the Kubernetes where the applications should run')
parser.add_argument('--reset',
action="store_true",
default=os.environ.get(
'RESET', 'false').lower() == 'true',
help='Resets the environment before execution')
parser.add_argument('--reset-only',
action="store_true",
default=os.environ.get(
'RESET_ONLY', 'false').lower() == 'true',
help='Only resets the environment. Ignores all other parameters')
parser.add_argument('--prometheus',
metavar='<URL>',
default=os.environ.get(
'PROMETHEUS_BASE_URL', 'http://localhost:9090'),
help='Defines where to find the prometheus instance')
parser.add_argument('--path',
metavar='<path>',
default=os.environ.get('RESULT_PATH', 'results'),
help='A directory path for the results')
parser.add_argument("--configurations",
metavar="KEY=VAL",
dest="configurations",
action=StoreDictKeyPair,
nargs="+",
default=env_dict_default('CONFIGURATIONS'),
help='Defines the environment variables for the UC')
return parser
def benchmark_parser(description):
"""
Parser for the overall benchmark execution
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--loads',
metavar='<load>',
type=int,
nargs='+',
default=env_list_default('LOADS', int),
help='[mandatory] Loads that should be executed')
parser.add_argument('--instances', '-i',
dest='instances_list',
metavar='<instances>',
type=int,
nargs='+',
default=env_list_default('INSTANCES', int),
help='[mandatory] List of instances used in benchmarks')
parser.add_argument('--domain-restriction',
action="store_true",
default=os.environ.get(
'DOMAIN_RESTRICTION', 'false').lower() == 'true',
help='To use domain restriction. For details see README')
parser.add_argument('--search-strategy',
metavar='<strategy>',
default=os.environ.get('SEARCH_STRATEGY', 'default'),
help='The benchmarking search strategy. Can be set to default, linear-search or binary-search')
parser.add_argument('--threshold',
type=int,
metavar='<threshold>',
default=os.environ.get('THRESHOLD', 2000),
help='The threshold for the trend slop that the search strategies use to determine that a load could be handled')
return parser
def execution_parser(description):
"""
Parser for executing one use case
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--exp-id',
metavar='<exp id>',
default=os.environ.get('EXP_ID'),
help='[mandatory] ID of the experiment')
parser.add_argument('--load',
metavar='<load>',
type=int,
default=os.environ.get('LOAD'),
help='[mandatory] Load that should be used for benchmakr')
parser.add_argument('--instances',
metavar='<instances>',
type=int,
default=os.environ.get('INSTANCES'),
help='[mandatory] Numbers of instances to be benchmarked')
return parser
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(directory, filename, warmup_sec):
df = pd.read_csv(os.path.join(directory, filename))
input = df
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
return trend_slope
matplotlib==3.2.0
pandas==1.0.1
requests==2.23.0
scikit-learn==0.22.2.post1
# For run_uc.py
kubernetes==11.0.0
confuse==1.1.0
This diff is collapsed.
from dataclasses import dataclass
@dataclass
class ExperimentConfig:
""" Wrapper for the configuration of an experiment. """
use_case: str
exp_id: int
dim_values: list
replicass: list
partitions: int
cpu_limit: str
memory_limit: str
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
domain_restriction_strategy: object
search_strategy: object
threshold: int
subexperiment_executor: object
subexperiment_evaluator: object
class ExperimentExecutor:
def __init__(self, config):
self.config=config
def execute(self):
self.config.domain_restriction_strategy.execute(self.config)
from dataclasses import dataclass
@dataclass
class SubexperimentConfig:
""" Wrapper for the configuration of a subexperiment """
use_case: str
exp_id: int
counter: int
dim_value: int
replicas: int
partitions: int
cpu_limit: str
memory_limit: str
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
# The lower bound strategy
def execute(config):
dim_value_index = 0
lower_bound_replicas_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass):
lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=lower_bound_replicas_index,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The strategy where the domain contains all amounts of instances
def execute(config):
dim_value_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values):
_, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=0,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The binary search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # successful, the upper neighbor is assumed to also has been successful
return (lower, subexperiment_counter+1)
else: # not successful
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # minimal instances found
return (upper, subexperiment_counter)
else:
return (upper+1, subexperiment_counter)
else:
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # success -> search in (lower, mid-1)
return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1)
else: # not success -> search in (mid+1, upper)
return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1)
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
# The check_all strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
new_lower_replicas_bound_index = lower_replicas_bound_index
new_lower_replicas_bound_found = False
subexperiments_total = len(config.dim_values) * len(config.replicass)
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter += 1
dim_value = config.dim_values[dim_value_index]
replicas = config.replicass[lower_replicas_bound_index]
print(
f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(
config.use_case, config.exp_id, subexperiment_counter, dim_value,
replicas, config.partitions, config.cpu_limit, config.memory_limit,
config.execution_minutes, config.prometheus_base_url, config.reset,
config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success and not new_lower_replicas_bound_found:
new_lower_replicas_bound_found = True
new_lower_replicas_bound_index = lower_replicas_bound_index
lower_replicas_bound_index += 1
return (new_lower_replicas_bound_index, subexperiment_counter)
# The linear-search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
subexperiments_total=len(config.dim_values)+len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success:
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
import lib.trend_slope_computer as trend_slope_computer
import logging
import os
WARMUP_SEC = 60
def execute(config, threshold):
"""
Check the trend slope of the totallag of the subexperiment if it comes below
the threshold.
:param config: Configuration of the subexperiment.
:param threshold: The threshold the trendslope need to come below.
"""
cwd = f'{os.getcwd()}/{config.result_path}'
file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
try:
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC)
except Exception as e:
err_msg = 'Computing trend slope failed'
print(err_msg)
logging.exception(err_msg)
print('Mark this subexperiment as not successful and continue benchmark')
return False
print(f"Trend Slope: {trend_slope}")
return trend_slope < threshold
# Wrapper that makes the execution method of a subexperiment interchangable.
import os
import run_uc
def execute(subexperiment_config):
run_uc.main(
exp_id=subexperiment_config.exp_id,
uc_id=subexperiment_config.use_case,
dim_value=int(subexperiment_config.dim_value),
instances=int(subexperiment_config.replicas),
partitions=subexperiment_config.partitions,
cpu_limit=subexperiment_config.cpu_limit,
memory_limit=subexperiment_config.memory_limit,
execution_minutes=int(subexperiment_config.execution_minutes),
prometheus_base_url=subexperiment_config.prometheus_base_url,
reset=subexperiment_config.reset,
ns=subexperiment_config.namespace,
result_path=subexperiment_config.result_path,
configurations=subexperiment_config.configurations)
.cache
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment