Skip to content
Snippets Groups Projects
Commit 1998c4e1 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Add step strategy heuristic

parent 6bad692b
No related branches found
No related tags found
3 merge requests!39Add Support for Benchmarking Strategies,!36Implement Benchmarking Strategy: Heuristic 2 (Binary Search Strategy),!35Implement Benchmarking Strategy: Heuristic 1 (Step Strategy)
...@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone ...@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import csv import csv
# #
exp_id = sys.argv[1] exp_id = sys.argv[1]
benchmark = sys.argv[2] benchmark = sys.argv[2]
...@@ -14,6 +13,8 @@ instances = sys.argv[4] ...@@ -14,6 +13,8 @@ instances = sys.argv[4]
execution_minutes = int(sys.argv[5]) execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
...@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes) ...@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z')) #print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z')) #print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
#'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", #'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
...@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv") ...@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv")
# Load total lag count # Load total lag count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -108,10 +109,17 @@ df = pd.DataFrame(d) ...@@ -108,10 +109,17 @@ df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv") df.to_csv(f"{filename}_totallag.csv")
# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not
# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements.
second_half = list(map(lambda x: x['value'], d[len(d)//2:]))
avg_lag = sum(second_half) / len(second_half)
with open(r"last_exp_result.txt", "w+") as file:
success = 0 if avg_lag > 1000 else 1
file.write(str(success))
# Load partition count # Load partition count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -135,7 +143,7 @@ df.to_csv(f"{filename}_partitions.csv") ...@@ -135,7 +143,7 @@ df.to_csv(f"{filename}_partitions.csv")
# Load instances count # Load instances count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
......
...@@ -3,23 +3,22 @@ ...@@ -3,23 +3,22 @@
import os import os
from .config import SubexperimentConfig from .config import SubexperimentConfig
def execute(config): def execute(config):
subexperiment_counter=0 subexperiment_counter=0
subexperiments_total=len(config.dim_values)*len(config.replicas) subexperiments_total=len(config.dim_values)*len(config.replicas)
i=0 i=0
j=0 j=0
while i in range(len(config.dim_values)): while i < len(config.replicas) and j < len(config.dim_values):
while j in range(len(config.replicas)): subexperiment_counter+=1
subexperiment_counter+=1 print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[j]} and {config.replicas[i]} replicas.")
print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[i]} and {config.replicas[j]} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[j], config.replicas[i], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[i], config.replicas[j], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute()
if result == 1:
j+=1
else:
i+=1
config.subexperiment_executor.execute(subexperiment_config) print(f"Executed {subexperiment_counter} experiments in total.")
result = config.subexperiment_evaluator.execute() \ No newline at end of file
if result:
i+=1
else:
j+=1
i+=1
\ No newline at end of file
def execute():
return
\ No newline at end of file
import os
def execute():
with open("last_exp_result.txt", "r") as file:
result = file.read()
return int(result) == 1
{
"test_step_strategy.py": true
}
\ No newline at end of file
...@@ -4,8 +4,11 @@ import sys ...@@ -4,8 +4,11 @@ import sys
import os import os
from strategies.config import ExperimentConfig from strategies.config import ExperimentConfig
import strategies.strategies.default_strategy as default_strategy import strategies.strategies.default_strategy as default_strategy
import strategies.strategies.step_strategy as step_strategy
from strategies.experiment_execution import ExperimentExecutor from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
import strategies.subexperiment_evaluation.noop_subexperiment_evaluator as noop_subexperiment_evaluator
import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator
uc=sys.argv[1] uc=sys.argv[1]
dim_values=sys.argv[2].split(',') dim_values=sys.argv[2].split(',')
...@@ -17,12 +20,14 @@ kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[ ...@@ -17,12 +20,14 @@ kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[
execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5 execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5
benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default" benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default"
print("Chosen benchmarking strategy: "+benchmark_strategy) print(f"Chosen benchmarking strategy: {benchmark_strategy}")
print("Going to execute " + str(len(dim_values)*len(replicas)) + " subexperiments in total..")
# todo set noop evaluator for default strategy if benchmark_strategy == "step":
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor) print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..")
executor = ExperimentExecutor(experiment_config) experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, step_strategy, subexperiment_executor, subexperiment_evaluator)
executor.execute() else:
print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..")
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor, noop_subexperiment_evaluator)
# todo add step strategy executor = ExperimentExecutor(experiment_config)
executor.execute()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment