Skip to content
Snippets Groups Projects
Commit d4ef1237 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/experimentResults' into 'master'

Store experiment results in dedicated directory

Closes #103

See merge request !56
parents f519afa8 3451f137
No related branches found
No related tags found
No related merge requests found
Showing with 87 additions and 140 deletions
exp_counter.txt
\ No newline at end of file
exp_counter.txt
results
......@@ -8,7 +8,7 @@ import csv
import logging
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
......@@ -70,11 +70,11 @@ def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_
fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields)
with open(r'results.csv', 'a') as f:
with open(f'{result_path}/results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
......@@ -163,4 +163,5 @@ if __name__ == '__main__':
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
main(exp_id, benchmark, dim_value, instances, execution_minutes,
'http://localhost:9090', 'results')
......@@ -56,8 +56,12 @@ def default_parser(description):
help='Only resets the environment. Ignores all other parameters')
parser.add_argument('--prometheus',
metavar='<URL>',
default=os.environ.get('PROMETHEUS_BASE_URL'),
default=os.environ.get('PROMETHEUS_BASE_URL', 'http://localhost:9090'),
help='Defines where to find the prometheus instance')
parser.add_argument('--path',
metavar='<path>',
default=os.environ.get('RESULT_PATH', 'results'),
help='A directory path for the results')
return parser
def benchmark_parser(description):
......
......@@ -248,7 +248,7 @@ def wait_execution(execution_minutes):
return
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None):
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
......@@ -258,12 +258,7 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
elif prometheus_base_url is not None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url)
else:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL'))
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path)
return
......@@ -460,7 +455,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, reset_only, ns):
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
......@@ -528,7 +523,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
print('---------------------')
run_evaluation(exp_id, uc_id, dim_value, instances,
execution_minutes, prometheus_base_url)
execution_minutes, prometheus_base_url, result_path)
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
......@@ -543,4 +538,4 @@ if __name__ == '__main__':
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_ms, args.duration, args.prometheus, args.reset,
args.reset_only, args.namespace)
args.namespace, args.path, args.reset_only)
......@@ -12,7 +12,11 @@ class ExperimentConfig:
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
domain_restriction_strategy: object
search_strategy: object
subexperiment_executor: object
subexperiment_evaluator: object
\ No newline at end of file
subexperiment_evaluator: object
......@@ -12,4 +12,8 @@ class SubexperimentConfig:
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
\ No newline at end of file
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
......@@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # successful, the upper neighbor is assumed to also has been successful
......@@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # minimal instances found
......@@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # success -> search in (lower, mid-1)
......@@ -44,4 +44,3 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
......@@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
......
......@@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
......@@ -19,4 +19,4 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
\ No newline at end of file
return (lower_replicas_bound_index, subexperiment_counter)
......@@ -4,10 +4,10 @@ import os
import lib.trend_slope_computer as trend_slope_computer
THRESHOLD = 2000
WARMUP_SEC = 60
WARMUP_SEC = 60
def execute(config):
cwd = os.getcwd()
cwd = f'{os.getcwd()}/{config.result_path}'
file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD)
......
......@@ -14,7 +14,7 @@ def execute(subexperiment_config):
memory_limit=subexperiment_config.memory_limit,
commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
execution_minutes=int(subexperiment_config.execution_minutes),
prometheus_base_url=None,
reset=False,
reset_only=False,
ns="default")
prometheus_base_url=subexperiment_config.prometheus_base_url,
reset=subexperiment_config.reset,
ns=subexperiment_config.namespace,
result_path=subexperiment_config.result_path)
......@@ -31,16 +31,20 @@ def load_variables():
def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
commit_ms, duration, domain_restriction, search_strategy,
prometheus_base_url ,reset, reset_only, namespace):
prometheus_base_url, reset, namespace, result_path):
print(f"Domain restriction of search space activated: {domain_restriction}")
print(f"Chosen search strategy: {search_strategy}")
if os.path.exists("exp_counter.txt"):
with open("exp_counter.txt", mode="r") as read_stream:
counter_path = f"{result_path}/exp_counter.txt"
if os.path.exists(counter_path):
with open(counter_path, mode="r") as read_stream:
exp_id = int(read_stream.read())
else:
exp_id = 0
# Create the directory if not exists
os.makedirs(result_path, exist_ok=True)
# Store metadata
separator = ","
......@@ -56,116 +60,51 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
f"DOMAIN_RESTRICTION={domain_restriction}\n",
f"SEARCH_STRATEGY={search_strategy}"
]
with open(f"exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream:
stream.writelines(lines)
with open("exp_counter.txt", mode="w") as write_stream:
with open(counter_path, mode="w") as write_stream:
write_stream.write(str(exp_id + 1))
# domain restriction
domain_restriction_strategy = None
search_strategy = None
# Select domain restriction
if domain_restriction:
# domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# domain restriction + check_all
else:
print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction
# domain restriction
domain_restriction_strategy = lower_bound_strategy
else:
# no domain restriction + linear-search
if search_strategy == "linear-search":
print(f"Going to execute at most {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=linear_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + binary-search
elif search_strategy == "binary-search":
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=binary_search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction + check_all
else:
print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..")
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
domain_restriction_strategy=no_lower_bound_strategy,
search_strategy=check_all_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
# no domain restriction
domain_restriction_strategy = no_lower_bound_strategy
# select search strategy
if search_strategy == "linear-search":
print(f"Going to execute at most {len(loads)+len(instances_list)-1} subexperiments in total..")
search_strategy = linear_search_strategy
elif search_strategy == "binary-search":
search_strategy = binary_search_strategy
else:
print(f"Going to execute {len(loads)*len(instances_list)} subexperiments in total..")
search_strategy = check_all_strategy
experiment_config = ExperimentConfig(
use_case=uc,
exp_id=exp_id,
dim_values=loads,
replicass=instances_list,
partitions=partitions,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
prometheus_base_url=prometheus_base_url,
reset=reset,
namespace=namespace,
result_path=result_path,
domain_restriction_strategy=domain_restriction_strategy,
search_strategy=search_strategy,
subexperiment_executor=subexperiment_executor,
subexperiment_evaluator=subexperiment_evaluator)
executor = ExperimentExecutor(experiment_config)
executor.execute()
......@@ -177,4 +116,4 @@ if __name__ == '__main__':
main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit,
args.memory_limit, args.commit_ms, args.duration,
args.domain_restriction, args.search_strategy, args.prometheus,
args.reset, args.reset_only, args.namespace)
args.reset, args.namespace, args.path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment