Skip to content
Snippets Groups Projects
Commit 55ce9ecb authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Introduce new version of benchmarking strategies

parent da35a89c
No related branches found
No related tags found
1 merge request!39Add Support for Benchmarking Strategies
Showing
with 455 additions and 119 deletions
File added
File added
from sklearn.linear_model import LinearRegression
import pandas as pd
import os
def compute(directory, filename, warmup_sec, threshold):
df = pd.read_csv(os.path.join(directory, filename))
input = df
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
#print(linear_regressor.coef_)
return trend_slope
\ No newline at end of file
...@@ -5,12 +5,13 @@ class ExperimentConfig: ...@@ -5,12 +5,13 @@ class ExperimentConfig:
""" Wrapper for the configuration of an experiment. """ """ Wrapper for the configuration of an experiment. """
use_case: str use_case: str
dim_values: list dim_values: list
replicas: list replicass: list
partitions: int partitions: int
cpu_limit: str cpu_limit: str
memory_limit: str memory_limit: str
kafka_streams_commit_interval_ms: int kafka_streams_commit_interval_ms: int
execution_minutes: int execution_minutes: int
benchmarking_strategy: object domain_restriction_strategy: object
search_strategy: object
subexperiment_executor: object subexperiment_executor: object
subexperiment_evaluator: object subexperiment_evaluator: object
\ No newline at end of file
...@@ -3,4 +3,4 @@ class ExperimentExecutor: ...@@ -3,4 +3,4 @@ class ExperimentExecutor:
self.config=config self.config=config
def execute(self): def execute(self):
self.config.benchmarking_strategy.execute(self.config) self.config.domain_restriction_strategy.execute(self.config)
import os
from .config import SubexperimentConfig
def searchTransition(config, replica_index, lower, upper, subexperiment_counter):
if lower==upper:
print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[lower]} {config.replicas[replica_index]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[lower], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result==1: # successful, the upper neighbor must be not successful
return lower+1
else: # not successful
return lower
elif lower+1 == upper:
print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[lower]} {config.replicas[replica_index]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[lower], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result==1: # successful, the upper neighbor must be not successful
print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[upper]} {config.replicas[replica_index]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[upper], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result == 1:
return upper+1
else:
return upper
else: # not successful
return lower
else:
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[mid]} {config.replicas[replica_index]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[mid], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result == 1: # success -> search in (mid+1, upper)
return searchTransition(config, replica_index, mid+1, upper, subexperiment_counter+1)
else: # not success -> search in (lower, mid-1)
return searchTransition(config, replica_index, lower, mid-1, subexperiment_counter+1)
def execute(config):
subexperiment_counter=0
lower = 0
upper = len(config.dim_values)-1
j = 0
while j < len(config.replicas) and lower < len(config.dim_values):
lower = searchTransition(config, j, lower, upper, subexperiment_counter+1)
j+=1
# Contains the default strategy that executes a subexperiment for all combinations of instances and dimension values.
import os
from .config import SubexperimentConfig
def execute(config):
subexperiment_counter=0
subexperiments_total=len(config.dim_values)*len(config.replicas)
for dim_value in config.dim_values:
for replica in config.replicas:
subexperiment_counter+=1
print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {dim_value} and {replica} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, replica, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
# strats:
# default: not end subexp when suitable instances found, increasing the number of instances with each iteration
# H1 end subexp when suitable instances found, increasing the number of instances with each iteration.
# H2 start with suitable instances from experiments for the last lower dim value -> outer loop over dim values, tracking suitable instances
# H3 end subexp when suitable instances found, using binary search.
# 3 Suchstrategien:
# 1. H0 (Teste alle)
# 2. H1 (Lineare Suche)
# 3. H2 (Binäre Suche)
# Optional: Einschränkung des Suchraumes:
# 1. Keine Einschränkung
# 2. Untere Schranke
# Fragen: Erkläre vorgehen, verstehen wir dasselbe?
# Zeithorizont?
# linear regressor besprechen im lag_analysis.py skript
# useful combinations
# Zum Report:
# Gegenüberstellung Suchstrategie & keine Suchstrategie (im Idealfall wdh. von Experimenten)
def execute(config):
dim_value_index = 0
lower_bound_replicas_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values) and lower_bound_replicas_index >= 0 and lower_bound_replicas_index < len(config.replicass):
lower_bound_replicas_index, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=lower_bound_replicas_index,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
def execute(config):
dim_value_index = 0
subexperiment_counter = 0
while dim_value_index < len(config.dim_values):
_, subexperiment_counter = config.search_strategy.execute(
config=config,
dim_value_index=dim_value_index,
lower_replicas_bound_index=0,
subexperiment_counter=subexperiment_counter)
dim_value_index+=1
\ No newline at end of file
import os
from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # successful, the upper neighbor is assumed to also has been successful
return (lower, subexperiment_counter+1)
else: # not successful
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # minimal instances found
return (upper, subexperiment_counter)
else:
return (upper+1, subexperiment_counter)
else:
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # success -> search in (lower, mid-1)
return binary_search(config, dim_value, lower, mid-1, subexperiment_counter+1)
else: # not success -> search in (mid+1, upper)
return binary_search(config, dim_value, mid+1, upper, subexperiment_counter+1)
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
# Runs subexperiments for all numbers of replicas with an index > lower_replicas_bound_index
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
new_lower_replicas_bound_index=lower_replicas_bound_index
new_lower_replicas_bound_found=False
subexperiments_total=len(config.dim_values)*len(config.replicass)
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
dim_value=config.dim_values[dim_value_index]
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config) == 1
if result == 1 and not new_lower_replicas_bound_found:
new_lower_replicas_bound_found = True
new_lower_replicas_bound_index = lower_replicas_bound_index
lower_replicas_bound_index+=1
return (new_lower_replicas_bound_index, subexperiment_counter)
# Perform linear search to determine the lowest number of replicas which is suitable for the dimension value with an index > lower_replicas_bound_index
import os
from strategies.strategies.config import SubexperimentConfig
def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_counter):
subexperiments_total=len(config.dim_values)+len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
while lower_replicas_bound_index < len(config.replicass):
subexperiment_counter+=1
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1:
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
\ No newline at end of file
# Contains the default strategy that executes a subexperiment for all combinations of instances and dimension values.
import os
from .config import SubexperimentConfig
def execute(config):
subexperiment_counter=0
subexperiments_total=len(config.dim_values)*len(config.replicas)
i=0
j=0
while i < len(config.replicas) and j < len(config.dim_values):
subexperiment_counter+=1
print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[j]} and {config.replicas[i]} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[j], config.replicas[i], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result == 1:
j+=1
else:
i+=1
print(f"Executed {subexperiment_counter} experiments in total.")
\ No newline at end of file
def execute(): def execute(config):
return return
\ No newline at end of file
import os import os
import sys
import os
import lib.trend_slope_computer as trend_slope_computer
THRESHOLD = 2000
WARMUP_SEC = 60
def execute(config):
cwd = os.getcwd()
file = f"exp{config.counter}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD)
def execute(): print(f"Trend Slope: {trend_slope}")
with open("last_exp_result.txt", "r") as file: success = 0 if trend_slope > THRESHOLD else 1
result = file.read() return success
return int(result) == 1
import pprint import pprint
from strategies.config import ExperimentConfig from strategies.config import ExperimentConfig
import strategies.strategies.binary_search_strategy as binary_search_strategy import strategies.strategies.search.binary_search_strategy as binary_search_strategy
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
...@@ -11,10 +12,10 @@ class Object(object): ...@@ -11,10 +12,10 @@ class Object(object):
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6] dim_values = [0, 1, 2, 3, 4, 5, 6]
replicas = [0, 1, 2, 3, 4, 5, 6] replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful # True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. # the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively.
# this means the first row starts with (0,0), the second row with (1, 0) etc. # this means the first row starts with (0,0), the second row with (1, 0) etc.
successful = [ successful = [
[ True , True , True , True , True , True , True ], [ True , True , True , True , True , True , True ],
...@@ -27,21 +28,22 @@ successful = [ ...@@ -27,21 +28,22 @@ successful = [
] ]
expected_order = [ expected_order = [
(3,0), # interval (0, 6) (0,3), # workload dim 0
(1,0), (0,1),
(0,0), (0,0),
(3,1), # interval (0, 6) (1,3), # workload dim 1
(1,1), (1,1),
(3,2), # interval (0, 6)
(1,2), (1,2),
(2,4), # workload dim 2
(2,2), (2,2),
(4,3), # interval (3, 6) (3,4), # workload dim 3
(3,2),
(3,3), (3,3),
(5,4), # interval (4, 6) (4,4), # workload dim 4
(4,4), (4,3),
(5,5), # interval (5,6) (5,5), # workload dim 5
(5,6), # interval (5,6) (5,6),
(6,6) (6,6) # workload dim 6
] ]
last_experiment = (0, 0) last_experiment = (0, 0)
...@@ -63,7 +65,7 @@ subexperiment_executor.execute = subexperiment_executor_executor ...@@ -63,7 +65,7 @@ subexperiment_executor.execute = subexperiment_executor_executor
subexperiment_evaluator = Object() subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(): def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:") print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful global expected_order, experiment_counter, last_experiment, successful
pp.pprint(last_experiment) pp.pprint(last_experiment)
...@@ -85,6 +87,18 @@ def test_binary_search_strategy(): ...@@ -85,6 +87,18 @@ def test_binary_search_strategy():
execution_minutes=5 execution_minutes=5
# execute # execute
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, binary_search_strategy, subexperiment_executor, subexperiment_evaluator) experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config) executor = ExperimentExecutor(experiment_config)
executor.execute() executor.execute()
\ No newline at end of file
import pprint
from strategies.config import ExperimentConfig
import strategies.strategies.search.check_all_strategy as check_all_strategy
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
class Object(object):
pass
pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6]
replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively.
# this means the first row starts with (0,0), the second row with (1, 0) etc.
successful = [
[ True , True , True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, False, True , True , True , True ],
[ False, False, False, False, True , True , True ],
[ False, False, False, False, False, False, True ],
[ False, False, False, False, False, False, False ]
]
# the expected order of executed experiments
expected_order = [
(0,0), # workload dim 0
(0,1),
(0,2),
(0,3),
(0,4),
(0,5),
(0,6),
(1,0), # workload dim 1
(1,1),
(1,2),
(1,3),
(1,4),
(1,5),
(1,6),
(2,2), # workload dim 2
(2,3),
(2,4),
(2,5),
(2,6),
(3,2), # workload dim 3
(3,3),
(3,4),
(3,5),
(3,6),
(4,3), # workload dim 4
(4,4),
(4,5),
(4,6),
(5,4), # workload dim 3
(5,5),
(5,6),
(6,6) # workload dim 6
]
last_experiment = (0, 0)
experiment_counter = -1
subexperiment_executor = Object()
def subexperiment_executor_executor(config):
global experiment_counter, last_experiment, pp
print("Simulate subexperiment with config:")
pp.pprint(config)
last_experiment = (config.dim_value, config.replicas)
experiment_counter += 1
print("Simulation complete")
subexperiment_executor.execute = subexperiment_executor_executor
# returns True if the experiment was successful
subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful
pp.pprint(expected_order[experiment_counter])
assert expected_order[experiment_counter] == last_experiment
print("Index was as expected. Evaluation finished.")
return 1 if successful[last_experiment[0]][last_experiment[1]] else 0
subexperiment_evaluator.execute = subexperiment_evaluator_execute
def test_linear_search_strategy():
# declare parameters
uc="test-uc"
partitions=40
cpu_limit="1000m"
memory_limit="4Gi"
kafka_streams_commit_interval_ms=100
execution_minutes=5
# execute
experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
import pprint import pprint
from strategies.config import ExperimentConfig from strategies.config import ExperimentConfig
import strategies.strategies.step_strategy as step_strategy import strategies.strategies.search.linear_search_strategy as linear_search_strategy
import strategies.strategies.domain_restriction.lower_bound_strategy as lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
...@@ -11,7 +12,7 @@ class Object(object): ...@@ -11,7 +12,7 @@ class Object(object):
pp = pprint.PrettyPrinter(indent=4) pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6] dim_values = [0, 1, 2, 3, 4, 5, 6]
replicas = [0, 1, 2, 3, 4, 5, 6] replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful # True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively. # the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as usual arrays from 0 - 6 respectively.
...@@ -62,7 +63,7 @@ subexperiment_executor.execute = subexperiment_executor_executor ...@@ -62,7 +63,7 @@ subexperiment_executor.execute = subexperiment_executor_executor
subexperiment_evaluator = Object() subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(): def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:") print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful global expected_order, experiment_counter, last_experiment, successful
pp.pprint(expected_order[experiment_counter]) pp.pprint(expected_order[experiment_counter])
...@@ -72,7 +73,7 @@ def subexperiment_evaluator_execute(): ...@@ -72,7 +73,7 @@ def subexperiment_evaluator_execute():
subexperiment_evaluator.execute = subexperiment_evaluator_execute subexperiment_evaluator.execute = subexperiment_evaluator_execute
def test_step_strategy(): def test_linear_search_strategy():
# declare parameters # declare parameters
uc="test-uc" uc="test-uc"
partitions=40 partitions=40
...@@ -82,6 +83,18 @@ def test_step_strategy(): ...@@ -82,6 +83,18 @@ def test_step_strategy():
execution_minutes=5 execution_minutes=5
# execute # execute
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, step_strategy, subexperiment_executor, subexperiment_evaluator) experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config) executor = ExperimentExecutor(experiment_config)
executor.execute() executor.execute()
\ No newline at end of file
import pprint
from strategies.config import ExperimentConfig
import strategies.strategies.search.binary_search_strategy as binary_search_strategy
import strategies.strategies.domain_restriction.no_lower_bound_strategy as no_lower_bound_strategy
from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
class Object(object):
pass
pp = pprint.PrettyPrinter(indent=4)
dim_values = [0, 1, 2, 3, 4, 5, 6]
replicass = [0, 1, 2, 3, 4, 5, 6]
# True means the experiment was successful
# the experiments are indexed row (representing dimension values) and column (representing number of replicas) wise as common known arrays from 0 - 6 respectively.
# this means the first row starts with (0,0), the second row with (1, 0) etc.
successful = [
[ True , True , True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, True , True , True , True , True ],
[ False, False, False, True , True , True , True ],
[ False, False, False, False, True , True , True ],
[ False, False, False, False, False, False, True ],
[ False, False, False, False, False, False, False ]
]
expected_order = [
(0,3), # workload dim 0
(0,1),
(0,0),
(1,3), # workload dim 1
(1,1),
(1,2),
(2,3), # workload dim 2
(2,1),
(2,2),
(3,3), # workload dim 3
(3,1),
(3,2),
(4,3), # workload dim 4
(4,5),
(4,4),
(5,3), # workload dim 5
(5,5),
(5,6),
(6,3), # workload dim 6
(6,5),
(6,6)
]
last_experiment = (0, 0)
experiment_counter = -1
subexperiment_executor = Object()
def subexperiment_executor_executor(config):
global experiment_counter, last_experiment, pp
print("Simulate subexperiment with config:")
pp.pprint(config)
last_experiment = (config.dim_value, config.replicas)
experiment_counter += 1
print("Simulation complete")
subexperiment_executor.execute = subexperiment_executor_executor
# returns True if the experiment was successful
subexperiment_evaluator = Object()
def subexperiment_evaluator_execute(i):
print("Evaluating last experiment. Index was:")
global expected_order, experiment_counter, last_experiment, successful
pp.pprint(last_experiment)
print("Index was expected to be:")
pp.pprint(expected_order[experiment_counter])
assert expected_order[experiment_counter] == last_experiment
print("Index was as expected. Evaluation finished.")
return 1 if successful[last_experiment[0]][last_experiment[1]] else 0
subexperiment_evaluator.execute = subexperiment_evaluator_execute
def test_binary_search_strategy():
# declare parameters
uc="test-uc"
partitions=40
cpu_limit="1000m"
memory_limit="4Gi"
kafka_streams_commit_interval_ms=100
execution_minutes=5
# execute
experiment_config = ExperimentConfig(
use_case=uc,
dim_values=dim_values,
replicass=replicass,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=kafka_streams_commit_interval_ms,
execution_minutes=execution_minutes,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment