Skip to content
Snippets Groups Projects
Commit c89df09f authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Pass configurations from theodolite to run uc script and use path in analysis

Pass more configratioin from the theodolite scrip to run uc script
instead of using defaults.
Use the result path in the analyis.
parent a8221a4b
No related branches found
No related tags found
1 merge request!56Store experiment results in dedicated directory
......@@ -8,7 +8,7 @@ import csv
import logging
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
......@@ -70,11 +70,11 @@ def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_
fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields)
with open(r'results.csv', 'a') as f:
with open(f'{result_path}/results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
filename = f"{result_path}/exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
......@@ -163,4 +163,5 @@ if __name__ == '__main__':
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
main(exp_id, benchmark, dim_value, instances, execution_minutes,
'http://localhost:9090', 'results')
......@@ -56,7 +56,7 @@ def default_parser(description):
help='Only resets the environment. Ignores all other parameters')
parser.add_argument('--prometheus',
metavar='<URL>',
default=os.environ.get('PROMETHEUS_BASE_URL'),
default=os.environ.get('PROMETHEUS_BASE_URL', 'http://localhost:9090'),
help='Defines where to find the prometheus instance')
parser.add_argument('--path',
metavar='<path>',
......
......@@ -248,7 +248,7 @@ def wait_execution(execution_minutes):
return
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url=None):
def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prometheus_base_url, result_path):
"""
Runs the evaluation function
:param string exp_id: ID of the experiment.
......@@ -258,12 +258,7 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome
:param int execution_minutes: How long the use case where executed.
"""
print('Run evaluation function')
if prometheus_base_url is None and environ.get('PROMETHEUS_BASE_URL') is None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes)
elif prometheus_base_url is not None:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url)
else:
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, environ.get('PROMETHEUS_BASE_URL'))
lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path)
return
......@@ -460,7 +455,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics):
stop_lag_exporter()
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, reset_only, ns):
def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False):
"""
Main method to execute one time the benchmark for a given use case.
Start workload generator/application -> execute -> analyse -> stop all
......@@ -528,7 +523,7 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi
print('---------------------')
run_evaluation(exp_id, uc_id, dim_value, instances,
execution_minutes, prometheus_base_url)
execution_minutes, prometheus_base_url, result_path)
print('---------------------')
# Reset cluster regular, therefore abort exit not needed anymore
......@@ -543,4 +538,4 @@ if __name__ == '__main__':
main(args.exp_id, args.uc, args.load, args.instances,
args.partitions, args.cpu_limit, args.memory_limit,
args.commit_ms, args.duration, args.prometheus, args.reset,
args.reset_only, args.namespace)
args.namespace, args.path, args.reset_only)
......@@ -12,7 +12,11 @@ class ExperimentConfig:
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
domain_restriction_strategy: object
search_strategy: object
subexperiment_executor: object
subexperiment_evaluator: object
\ No newline at end of file
subexperiment_evaluator: object
......@@ -12,4 +12,8 @@ class SubexperimentConfig:
cpu_limit: str
memory_limit: str
kafka_streams_commit_interval_ms: int
execution_minutes: int
\ No newline at end of file
execution_minutes: int
prometheus_base_url: str
reset: bool
namespace: str
result_path: str
......@@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig
def binary_search(config, dim_value, lower, upper, subexperiment_counter):
if lower == upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # successful, the upper neighbor is assumed to also has been successful
......@@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
return (lower+1, subexperiment_counter)
elif lower+1==upper:
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result==1: # minimal instances found
return (lower, subexperiment_counter)
else: # not successful, check if lower+1 instances are sufficient
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # minimal instances found
......@@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter):
# test mid
mid=(upper+lower)//2
print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
if result == 1: # success -> search in (lower, mid-1)
......@@ -44,4 +44,3 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
upper = len(config.replicass)-1
dim_value=config.dim_values[dim_value_index]
return binary_search(config, dim_value, lower_replicas_bound_index, upper, subexperiment_counter)
......@@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
......
......@@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
replicas=config.replicass[lower_replicas_bound_index]
print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path)
config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute(subexperiment_config)
......@@ -19,4 +19,4 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c
return (lower_replicas_bound_index, subexperiment_counter)
else:
lower_replicas_bound_index+=1
return (lower_replicas_bound_index, subexperiment_counter)
\ No newline at end of file
return (lower_replicas_bound_index, subexperiment_counter)
......@@ -4,10 +4,10 @@ import os
import lib.trend_slope_computer as trend_slope_computer
THRESHOLD = 2000
WARMUP_SEC = 60
WARMUP_SEC = 60
def execute(config):
cwd = os.getcwd()
cwd = f'{os.getcwd()}/{config.result_path}'
file = f"exp{config.exp_id}_uc{config.use_case}_{config.dim_value}_{config.replicas}_totallag.csv"
trend_slope = trend_slope_computer.compute(cwd, file, WARMUP_SEC, THRESHOLD)
......
......@@ -14,7 +14,7 @@ def execute(subexperiment_config):
memory_limit=subexperiment_config.memory_limit,
commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
execution_minutes=int(subexperiment_config.execution_minutes),
prometheus_base_url=None,
reset=False,
reset_only=False,
ns="default")
prometheus_base_url=subexperiment_config.prometheus_base_url,
reset=subexperiment_config.reset,
ns=subexperiment_config.namespace,
result_path=subexperiment_config.result_path)
......@@ -31,7 +31,7 @@ def load_variables():
def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
commit_ms, duration, domain_restriction, search_strategy,
prometheus_base_url ,reset, reset_only, namespace, result_path):
prometheus_base_url, reset, namespace, result_path):
print(f"Domain restriction of search space activated: {domain_restriction}")
print(f"Chosen search strategy: {search_strategy}")
......@@ -97,6 +97,10 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit,
memory_limit=memory_limit,
kafka_streams_commit_interval_ms=commit_ms,
execution_minutes=duration,
prometheus_base_url=prometheus_base_url,
reset=reset,
namespace=namespace,
result_path=result_path,
domain_restriction_strategy=domain_restriction_strategy,
search_strategy=search_strategy,
subexperiment_executor=subexperiment_executor,
......@@ -112,4 +116,4 @@ if __name__ == '__main__':
main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit,
args.memory_limit, args.commit_ms, args.duration,
args.domain_restriction, args.search_strategy, args.prometheus,
args.reset, args.reset_only, args.namespace, args.path)
args.reset, args.namespace, args.path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment