Skip to content
Snippets Groups Projects
Commit cbc9ab75 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'stu200776/spesb-46-add-more-detailed-benchmarking-strategies'

parents 96450c4b c04b83c5
No related branches found
No related tags found
1 merge request!39Add Support for Benchmarking Strategies
Showing
with 456 additions and 10 deletions
......@@ -30,3 +30,6 @@ tmp/
# Python Venv
.venv
# Python cache files
*.pyc
......@@ -150,17 +150,37 @@ Depending on your setup, some additional adjustments may be necessary:
## Execution
The `./theodolite.sh` is the entrypoint for all benchmark executions. Is has to be called as follows:
Please note that a **Python 3.7** installation is required for executing Theodolite.
The `./theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows:
```sh
./theodolite.sh <use-case> <wl-values> <instances> <partitions> <cpu-limit> <memory-limit> <commit-interval> <duration>
./theodolite.sh <use-case> <wl-values> <instances> <partitions> <cpu-limit> <memory-limit> <commit-interval> <duration> <domain-restriction> <search-strategy>
```
* `<use-case>`: Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`.
* `<wl-values>`: Values for the workload generator to be tested, separated by commas and quoted. For example `"100000, 200000, 300000"`.
* `<instances>`: Numbers of instances to be benchmarked, separated by commas and quoted. For example `"1, 2, 3, 4"`.
* `<wl-values>`: Values for the workload generator to be tested, separated by commas and sorted in ascending order. For example `100000,200000,300000`.
* `<instances>`: Numbers of instances to be benchmarked, separated by commas and sorted in ascending order. For example `1,2,3,4`.
* `<partitions>`: Number of partitions for Kafka topics. Optional. Default `40`.
* `<cpu-limit>`: Kubernetes CPU limit. Optional. Default `1000m`.
* `<memory-limit>`: Kubernetes memory limit. Optional. Default `4Gi`.
* `<commit-interval>`: Kafka Streams' commit interval in milliseconds. Optional. Default `100`.
* `<duration>`: Duration in minutes subexperiments should be executed for. Optional. Default `5`.
* `<domain-restriction>`: The domain restriction: `domain-restriction` to use domain restriction `no-domain-restriction` to not use domain restriction. Default `no-domain-restriction`. For more details see Section _Domain Restriction_.
* `<search-strategy>`: The benchmarking search strategy. Can be set to `check-all`, `linear-search` or `binary-search`. Default `check-all`. For more details see Section _Benchmarking Search Strategies_.
### Domain Restriction
For dimension value, we have a domain of the amounts of instances. As a consequence, for each dimension value the maximum number of lag experiments is equal to the size of the domain. How the domain is determined is defined by the following domain restriction strategies.
* `no-domain-restriction`: For each dimension value, the domain of instances is equal to the set of all amounts of instances.
* `domain-restriction`: For each dimension value, the domain is computed as follows:
* If the dimension value is the smallest dimension value the domain of the amounts of instances is equal to the set of all amounts of instances.
* If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N.
### Benchmarking Search Strategies
There are the following benchmarking strategies:
* `check-all`: For each dimension value, execute one lag experiment for all amounts of instances within the current domain.
* `linear-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is from the lowest number of instances to the highest amount of instances and the execution for each dimension value is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful.
* `binary-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is in a binary-search-like manner. The execution is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful.
......@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
#
exp_id = sys.argv[1]
benchmark = sys.argv[2]
......@@ -14,6 +13,8 @@ instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
......@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
response = requests.get(prometheus_query_path, params={
#'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
......@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv")
# Load total lag count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
response = requests.get(prometheus_query_path, params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
......@@ -108,10 +109,9 @@ df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv")
# Load partition count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
response = requests.get(prometheus_query_path, params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
......@@ -135,7 +135,7 @@ df.to_csv(f"{filename}_partitions.csv")
# Load instances count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
response = requests.get(prometheus_query_path, params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
......
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(directory, filename, warmup_sec, threshold):
df = pd.read_csv(os.path.join(directory, filename))
input = df
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
return trend_slope
\ No newline at end of file
from dataclasses import dataclass
@dataclass
class ExperimentConfig:
""" Wrapper for the configuration of an experiment. """
use_case: str
exp_id: int
dim_values: list
replicass: list
partitions: int
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
domain_restriction_strategy: object
search_strategy: object
subexperiment_executor: object
subexperiment_evaluator: object
\ No newline at end of file
class ExperimentExecutor:
def __init__(self, config):
self.config=config
def execute(self):
self.config.domain_restriction_strategy.execute(self.config)
from dataclasses import dataclass
@dataclass
class SubexperimentConfig:
""" Wrapper for the configuration of a subexperiment """
use_case: str
exp_id: int
counter: int
dim_value: int
replicas: int
partitions: int
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
\ No newline at end of file
# The lower bound strategy
def execute(config):
dim_value_index = 0
lower_bound_replicas_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass):
lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=lower_bound_replicas_index,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The strategy where the domain contains all amounts of instances
def execute(config):
dim_value_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values):
_, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=0,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
# The binary search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # successful, the upper neighbor is assumed to also has been successful
return (lower, subexperiment_counter+1)
else: # not successful
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # minimal instances found
return (upper, subexperiment_counter)
else:
return (upper+1, subexperiment_counter)
else:
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # success -> search in (lower, mid-1)
return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1)
else: # not success -> search in (mid+1, upper)
return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1)
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
# The check_all strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
new_lower_replicas_bound_index=lower_replicas_bound_index
new_lower_replicas_bound_found=False
subexperiments_total=len(config.dim_values)*len(config.replicass)
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
dim_value=config.dim_values[dim_value_index]
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config) == 1
if result == 1 and not new_lower_replicas_bound_found:
new_lower_replicas_bound_found = True
new_lower_replicas_bound_index = lower_replicas_bound_index
lower_replicas_bound_index+=1
return (new_lower_replicas_bound_index, subexperiment_counter)
# The linear-search strategy
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
subexperiments_total=len(config.dim_values)+len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1:
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
\ No newline at end of file
import os
import sys
import os
import lib.trend_slope_computer as trend_slope_computer
THRESHOLD = 2000
WARMUP_SEC = 60
def execute(config):
cwd = os.getcwd()
file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD)
print(f"Trend Slope: {trend_slope}")
success = 0 if trend_slope > THRESHOLD else 1
return success
# Wrapper that makes the execution method of a subexperiment interchangable.
import os
dirname = os.path.dirname(__file__)
os.chdir(dirname+"/../../")
def execute(subexperiment_config):
os.system(f"./run_uc{subexperiment_config.use_case}.sh {subexperiment_config.exp_id} {subexperiment_config.dim_value} {subexperiment_config.replicas} {subexperiment_config.partitions} {subexperiment_config.cpu_limit} {subexperiment_config.memory_limit} {subexperiment_config.kafka_streams_commit_interval_ms} {subexperiment_config.execution_minutes}")
\ No newline at end of file
.cache
\ No newline at end of file
import pprint
from strategies.config import ExperimentConfig
import strategies.strategies.search.binary_search_strategy as binary_search_strategy
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
class Object(object):
pass
pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6]
replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively.
# this means the first row starts with (0,0), the second row with (1, 0) etc.
successful = [
[ True , True , True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, False, True , True , True , True ],
[ False, False, False, False, True , True , True ],
[ False, False, False, False, False, False, True ],
[ False, False, False, False, False, False, False ]
]
expected_order = [
(0,3), # workload dim 0
(0,1),
(0,0),
(1,3), # workload dim 1
(1,1),
(1,2),
(2,4), # workload dim 2
(2,2),
(3,4), # workload dim 3
(3,2),
(3,3),
(4,4), # workload dim 4
(4,3),
(5,5), # workload dim 5
(5,6),
(6,6) # workload dim 6
]
last_experiment = (0, 0)
experiment_counter = -1
subexperiment_executor = Object()
def subexperiment_executor_executor(config):
global experiment_counter, last_experiment, pp
print("Simulate subexperiment with config:")
pp.pprint(config)
last_experiment = (config.dim_value, config.replicas)
experiment_counter += 1
print("Simulation complete")
subexperiment_executor.execute = subexperiment_executor_executor
# returns True if the experiment was successful
subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful
pp.pprint(last_experiment)
print("Index was expected to be:")
pp.pprint(expected_order[experiment_counter])
assert expected_order[experiment_counter] == last_experiment
print("Index was as expected. Evaluation finished.")
return 1 if successful[last_experiment[0]][last_experiment[1]] else 0
subexperiment_evaluator.execute = subexperiment_evaluator_execute
def test_binary_search_strategy():
# declare parameters
uc="test-uc"
partitions=40
cpu_limit="1000m"
memory_limit="4Gi"
kafka_streams_commit_interval_ms=100
execution_minutes=5
# execute
experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
import pprint
from strategies.config import ExperimentConfig
import strategies.strategies.search.check_all_strategy as check_all_strategy
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
class Object(object):
pass
pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6]
replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively.
# this means the first row starts with (0,0), the second row with (1, 0) etc.
successful = [
[ True , True , True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, False, True , True , True , True ],
[ False, False, False, False, True , True , True ],
[ False, False, False, False, False, False, True ],
[ False, False, False, False, False, False, False ]
]
# the expected order of executed experiments
expected_order = [
(0,0), # workload dim 0
(0,1),
(0,2),
(0,3),
(0,4),
(0,5),
(0,6),
(1,0), # workload dim 1
(1,1),
(1,2),
(1,3),
(1,4),
(1,5),
(1,6),
(2,2), # workload dim 2
(2,3),
(2,4),
(2,5),
(2,6),
(3,2), # workload dim 3
(3,3),
(3,4),
(3,5),
(3,6),
(4,3), # workload dim 4
(4,4),
(4,5),
(4,6),
(5,4), # workload dim 3
(5,5),
(5,6),
(6,6) # workload dim 6
]
last_experiment = (0, 0)
experiment_counter = -1
subexperiment_executor = Object()
def subexperiment_executor_executor(config):
global experiment_counter, last_experiment, pp
print("Simulate subexperiment with config:")
pp.pprint(config)
last_experiment = (config.dim_value, config.replicas)
experiment_counter += 1
print("Simulation complete")
subexperiment_executor.execute = subexperiment_executor_executor
# returns True if the experiment was successful
subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful
pp.pprint(expected_order[experiment_counter])
assert expected_order[experiment_counter] == last_experiment
print("Index was as expected. Evaluation finished.")
return 1 if successful[last_experiment[0]][last_experiment[1]] else 0
subexperiment_evaluator.execute = subexperiment_evaluator_execute
def test_linear_search_strategy():
# declare parameters
uc="test-uc"
partitions=40
cpu_limit="1000m"
memory_limit="4Gi"
kafka_streams_commit_interval_ms=100
execution_minutes=5
# execute
experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment