diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index 814f8d105fb8a56305e07d6c06a969f3dc2e1738..ce2e0f7f615e147836ee3384b6eacdf13689f7b8 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -13,7 +13,7 @@ instances = sys.argv[4] execution_minutes = int(sys.argv[5]) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) -prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range' +prometheus_query_path = 'http://kube1.internal:32529/api/v1/query_range' #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s @@ -51,6 +51,14 @@ for result in results: df = pd.DataFrame(d) +# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not +# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements. +second_half = list(map(lambda x: x['value'], filter(lambda x: x['topic'] == 'input', d[len(d)//2:]))) +avg_lag = sum(second_half) / len(second_half) +with open(r"last_exp_result.txt", "w+") as file: + success = 0 if avg_lag > 1000 else 1 + file.write(str(success)) + # Do some analysis input = df.loc[df['topic'] == "input"] @@ -109,14 +117,6 @@ df = pd.DataFrame(d) df.to_csv(f"{filename}_totallag.csv") -# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not -# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements. -second_half = list(map(lambda x: x['value'], d[len(d)//2:])) -avg_lag = sum(second_half) / len(second_half) -with open(r"last_exp_result.txt", "w+") as file: - success = 0 if avg_lag > 1000 else 1 - file.write(str(success)) - # Load partition count response = requests.get(prometheus_query_path, params={ diff --git a/execution/strategies/strategies/binary_search_strategy.py b/execution/strategies/strategies/binary_search_strategy.py index f154b6114f70dc18c1b88db3a8679278e9710f73..f27f54d3c6502ccd9d3dc68d9ef4819a5cc23d7c 100644 --- a/execution/strategies/strategies/binary_search_strategy.py +++ b/execution/strategies/strategies/binary_search_strategy.py @@ -30,7 +30,6 @@ def searchTransition(config, replica_index, lower, upper, subexperiment_counter) else: # test mid mid=(upper+lower)//2 - print(mid) print(f"Run subexperiment {subexperiment_counter} with config {config.dim_values[mid]} {config.replicas[replica_index]}") subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[mid], config.replicas[replica_index], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) config.subexperiment_executor.execute(subexperiment_config)