Skip to content
Snippets Groups Projects
Commit b85a8d32 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Remove Python implementation

parent 0c976926
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!110Remove-previous-Python-implementation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 0 additions and 1210 deletions
FROM python:3.8
RUN mkdir /app
WORKDIR /app
ADD requirements.txt /app/
RUN pip install -r requirements.txt
COPY uc-workload-generator /app/uc-workload-generator
COPY uc-application /app/uc-application
COPY strategies /app/strategies
COPY lib /app/lib
COPY lag_analysis.py /app/
COPY run_uc.py /app/
COPY theodolite.py /app/
CMD ["python", "/app/theodolite.py"]
import sys
import os
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
import logging
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms)
print(f"Now Local: {now_local}")
print(f"Now Used: {now}")
end = now
start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
# 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
# response
# print(response.request.path_url)
# response.content
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
# Do some analysis
input = df.loc[df['topic'] == "input"]
# input.plot(kind='line',x='timestamp',y='value',color='red')
# plt.show()
from sklearn.linear_model import LinearRegression
# values converts it into a numpy array
X = input.iloc[:, 1].values.reshape(-1, 1)
# -1 means that calculate the dimension of rows, but have 1 column
Y = input.iloc[:, 2].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
print(linear_regressor.coef_)
# print(Y_pred)
fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields)
with open(f'{result_path}/results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv")
# Load total lag count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
group = result['metric']['group']
for value in result['values']:
# print(value)
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv")
# Load partition count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
# print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv")
# Load instances count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
for value in result['values']:
# print(value)
d.append({'timestamp': int(value[0]), 'value': int(value[1])})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Load arguments
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes,
'http://localhost:9090', 'results')
import argparse
import os
def env_list_default(env, tf):
"""
Makes a list from an environment string.
"""
v = os.environ.get(env)
if v is not None:
v = [tf(s) for s in v.split(',')]
return v
def key_values_to_dict(kvs):
"""
Given a list with key values in form `Key=Value` it creates a dict from it.
"""
my_dict = {}
for kv in kvs:
k, v = kv.split("=")
my_dict[k] = v
return my_dict
def env_dict_default(env):
"""
Makes a dict from an environment string.
"""
v = os.environ.get(env)
if v is not None:
return key_values_to_dict(v.split(','))
else:
return dict()
class StoreDictKeyPair(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
self._nargs = nargs
super(StoreDictKeyPair, self).__init__(
option_strings, dest, nargs=nargs, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
my_dict = key_values_to_dict(values)
setattr(namespace, self.dest, my_dict)
def default_parser(description):
"""
Returns the default parser that can be used for thodolite and run uc py
:param description: The description the argument parser should show.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--uc',
metavar='<uc>',
default=os.environ.get('UC'),
help='[mandatory] use case number, one of 1, 2, 3 or 4')
parser.add_argument('--partitions', '-p',
metavar='<partitions>',
type=int,
default=os.environ.get('PARTITIONS', 40),
help='Number of partitions for Kafka topics')
parser.add_argument('--cpu-limit', '-cpu',
metavar='<CPU limit>',
default=os.environ.get('CPU_LIMIT', '1000m'),
help='Kubernetes CPU limit')
parser.add_argument('--memory-limit', '-mem',
metavar='<memory limit>',
default=os.environ.get('MEMORY_LIMIT', '4Gi'),
help='Kubernetes memory limit')
parser.add_argument('--duration', '-d',
metavar='<duration>',
type=int,
default=os.environ.get('DURATION', 5),
help='Duration in minutes subexperiments should be \
executed for')
parser.add_argument('--namespace',
metavar='<NS>',
default=os.environ.get('NAMESPACE', 'default'),
help='Defines the Kubernetes where the applications should run')
parser.add_argument('--reset',
action="store_true",
default=os.environ.get(
'RESET', 'false').lower() == 'true',
help='Resets the environment before execution')
parser.add_argument('--reset-only',
action="store_true",
default=os.environ.get(
'RESET_ONLY', 'false').lower() == 'true',
help='Only resets the environment. Ignores all other parameters')
parser.add_argument('--prometheus',
metavar='<URL>',
default=os.environ.get(
'PROMETHEUS_BASE_URL', 'http://localhost:9090'),
help='Defines where to find the prometheus instance')
parser.add_argument('--path',
metavar='<path>',
default=os.environ.get('RESULT_PATH', 'results'),
help='A directory path for the results')
parser.add_argument("--configurations",
metavar="KEY=VAL",
dest="configurations",
action=StoreDictKeyPair,
nargs="+",
default=env_dict_default('CONFIGURATIONS'),
help='Defines the environment variables for the UC')
return parser
def benchmark_parser(description):
"""
Parser for the overall benchmark execution
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--loads',
metavar='<load>',
type=int,
nargs='+',
default=env_list_default('LOADS', int),
help='[mandatory] Loads that should be executed')
parser.add_argument('--instances', '-i',
dest='instances_list',
metavar='<instances>',
type=int,
nargs='+',
default=env_list_default('INSTANCES', int),
help='[mandatory] List of instances used in benchmarks')
parser.add_argument('--domain-restriction',
action="store_true",
default=os.environ.get(
'DOMAIN_RESTRICTION', 'false').lower() == 'true',
help='To use domain restriction. For details see README')
parser.add_argument('--search-strategy',
metavar='<strategy>',
default=os.environ.get('SEARCH_STRATEGY', 'default'),
help='The benchmarking search strategy. Can be set to default, linear-search or binary-search')
parser.add_argument('--threshold',
type=int,
metavar='<threshold>',
default=os.environ.get('THRESHOLD', 2000),
help='The threshold for the trend slop that the search strategies use to determine that a load could be handled')
return parser
def execution_parser(description):
"""
Parser for executing one use case
:param description: The description the argument parser should show.
"""
parser = default_parser(description)
parser.add_argument('--exp-id',
metavar='<exp id>',
default=os.environ.get('EXP_ID'),
help='[mandatory] ID of the experiment')
parser.add_argument('--load',
metavar='<load>',
type=int,
default=os.environ.get('LOAD'),
help='[mandatory] Load that should be used for benchmakr')
parser.add_argument('--instances',
metavar='<instances>',
type=int,
default=os.environ.get('INSTANCES'),
help='[mandatory] Numbers of instances to be benchmarked')
return parser
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(directory, filename, warmup_sec):
df = pd.read_csv(os.path.join(directory, filename))
input = df
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
return trend_slope
matplotlib==3.2.0
pandas==1.0.1
requests==2.23.0
scikit-learn==0.22.2.post1
# For run_uc.py
kubernetes==11.0.0
confuse==1.1.0
This diff is collapsed.
from dataclasses import dataclass
@dataclass
class ExperimentConfig:
""" Wrapper for the configuration of an experiment. """
use_case: str
exp_id: int
dim_values: list
replicass: list
partitions: int
cpu_limit: str
memory_limit: str
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
domain_restriction_strategy: object
search_strategy: object
threshold: int
subexperiment_executor: object
subexperiment_evaluator: object
class ExperimentExecutor:
def __init__(self, config):
self.config=config
def execute(self):
self.config.domain_restriction_strategy.execute(self.config)
from dataclasses import dataclass
@dataclass
class SubexperimentConfig:
""" Wrapper for the configuration of a subexperiment """
use_case: str
exp_id: int
counter: int
dim_value: int
replicas: int
partitions: int
cpu_limit: str
memory_limit: str
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
configurations: dict
# The lower bound strategy
def execute(config):
dim_value_index = 0
lower_bound_replicas_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass):
lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=lower_bound_replicas_index,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The strategy where the domain contains all amounts of instances
def execute(config):
dim_value_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values):
_, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=0,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The binary search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # successful, the upper neighbor is assumed to also has been successful
return (lower, subexperiment_counter+1)
else: # not successful
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # minimal instances found
return (upper, subexperiment_counter)
else:
return (upper+1, subexperiment_counter)
else:
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success: # success -> search in (lower, mid-1)
return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1)
else: # not success -> search in (mid+1, upper)
return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1)
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
# The check_all strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
new_lower_replicas_bound_index = lower_replicas_bound_index
new_lower_replicas_bound_found = False
subexperiments_total = len(config.dim_values) * len(config.replicass)
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter += 1
dim_value = config.dim_values[dim_value_index]
replicas = config.replicass[lower_replicas_bound_index]
print(
f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(
config.use_case, config.exp_id, subexperiment_counter, dim_value,
replicas, config.partitions, config.cpu_limit, config.memory_limit,
config.execution_minutes, config.prometheus_base_url, config.reset,
config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success and not new_lower_replicas_bound_found:
new_lower_replicas_bound_found = True
new_lower_replicas_bound_index = lower_replicas_bound_index
lower_replicas_bound_index += 1
return (new_lower_replicas_bound_index, subexperiment_counter)
# The linear-search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
subexperiments_total=len(config.dim_values)+len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations)
config.subexperiment_executor.execute(subexperiment_config)
success = config.subexperiment_evaluator.execute(subexperiment_config,
config.threshold)
if success:
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
import lib.trend_slope_computer as trend_slope_computer
import logging
import os
WARMUP_SEC = 60
def execute(config, threshold):
"""
Check the trend slope of the totallag of the subexperiment if it comes below
the threshold.
:param config: Configuration of the subexperiment.
:param threshold: The threshold the trendslope need to come below.
"""
cwd = f'{os.getcwd()}/{config.result_path}'
file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
try:
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC)
except Exception as e:
err_msg = 'Computing trend slope failed'
print(err_msg)
logging.exception(err_msg)
print('Mark this subexperiment as not successful and continue benchmark')
return False
print(f"Trend Slope: {trend_slope}")
return trend_slope < threshold
# Wrapper that makes the execution method of a subexperiment interchangable.
import os
import run_uc
def execute(subexperiment_config):
run_uc.main(
exp_id=subexperiment_config.exp_id,
uc_id=subexperiment_config.use_case,
dim_value=int(subexperiment_config.dim_value),
instances=int(subexperiment_config.replicas),
partitions=subexperiment_config.partitions,
cpu_limit=subexperiment_config.cpu_limit,
memory_limit=subexperiment_config.memory_limit,
execution_minutes=int(subexperiment_config.execution_minutes),
prometheus_base_url=subexperiment_config.prometheus_base_url,
reset=subexperiment_config.reset,
ns=subexperiment_config.namespace,
result_path=subexperiment_config.result_path,
configurations=subexperiment_config.configurations)
.cache
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment