Skip to content
Snippets Groups Projects
Commit 793f22a0 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Cleanup code

parent 94760bc8
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,7 @@ instances = sys.argv[4] ...@@ -13,7 +13,7 @@ instances = sys.argv[4]
execution_minutes = int(sys.argv[5]) execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range' prometheus_query_path = 'http://kube1.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
...@@ -51,6 +51,14 @@ for result in results: ...@@ -51,6 +51,14 @@ for result in results:
df = pd.DataFrame(d) df = pd.DataFrame(d)
# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not
# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements.
second_half = list(map(lambda x: x['value'], filter(lambda x: x['topic'] == 'input', d[len(d)//2:])))
avg_lag = sum(second_half) / len(second_half)
with open(r"last_exp_result.txt", "w+") as file:
success = 0 if avg_lag > 1000 else 1
file.write(str(success))
# Do some analysis # Do some analysis
input = df.loc[df['topic'] == "input"] input = df.loc[df['topic'] == "input"]
...@@ -109,14 +117,6 @@ df = pd.DataFrame(d) ...@@ -109,14 +117,6 @@ df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv") df.to_csv(f"{filename}_totallag.csv")
# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not
# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements.
second_half = list(map(lambda x: x['value'], d[len(d)//2:]))
avg_lag = sum(second_half) / len(second_half)
with open(r"last_exp_result.txt", "w+") as file:
success = 0 if avg_lag > 1000 else 1
file.write(str(success))
# Load partition count # Load partition count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_query_path, params={
......
...@@ -30,7 +30,6 @@ def searchTransition(config, replica_index, lower, upper, subexperiment_counter) ...@@ -30,7 +30,6 @@ def searchTransition(config, replica_index, lower, upper, subexperiment_counter)
else: else:
# test mid # test mid
mid=(upper+lower)//2 mid=(upper+lower)//2
print(mid)
print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[mid]} {config.replicas[replica_index]}") print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[mid]} {config.replicas[replica_index]}")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[mid], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[mid], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config) config.subexperiment_executor.execute(subexperiment_config)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment