Skip to content
Snippets Groups Projects
Commit 99f085c1 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Add step strategy heuristic

parent 0f804929
Branches
Tags
No related merge requests found
...@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone ...@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import csv import csv
# #
exp_id = sys.argv[1] exp_id = sys.argv[1]
benchmark = sys.argv[2] benchmark = sys.argv[2]
...@@ -14,6 +13,8 @@ instances = sys.argv[4] ...@@ -14,6 +13,8 @@ instances = sys.argv[4]
execution_minutes = int(sys.argv[5]) execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
...@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes) ...@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z')) #print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z')) #print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
#'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", #'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
...@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv") ...@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv")
# Load total lag count # Load total lag count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -108,10 +109,17 @@ df = pd.DataFrame(d) ...@@ -108,10 +109,17 @@ df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv") df.to_csv(f"{filename}_totallag.csv")
# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not
# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements.
second_half = list(map(lambda x: x['value'], d[len(d)//2:]))
avg_lag = sum(second_half) / len(second_half)
with open(r"last_exp_result.txt", "w+") as file:
success = 0 if avg_lag > 1000 else 1
file.write(str(success))
# Load partition count # Load partition count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -135,7 +143,7 @@ df.to_csv(f"{filename}_partitions.csv") ...@@ -135,7 +143,7 @@ df.to_csv(f"{filename}_partitions.csv")
# Load instances count # Load instances count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ response = requests.get(prometheus_query_path, params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
... ...
......
...@@ -3,23 +3,22 @@ ...@@ -3,23 +3,22 @@
import os import os
from .config import SubexperimentConfig from .config import SubexperimentConfig
def execute(config): def execute(config):
subexperiment_counter=0 subexperiment_counter=0
subexperiments_total=len(config.dim_values)*len(config.replicas) subexperiments_total=len(config.dim_values)*len(config.replicas)
i=0 i=0
j=0 j=0
while i in range(len(config.dim_values)): while i < len(config.replicas) and j < len(config.dim_values):
while j in range(len(config.replicas)):
subexperiment_counter+=1 subexperiment_counter+=1
print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[i]} and {config.replicas[j]} replicas.") print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[j]} and {config.replicas[i]} replicas.")
subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[i], config.replicas[j], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes) subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[j], config.replicas[i], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
config.subexperiment_executor.execute(subexperiment_config) config.subexperiment_executor.execute(subexperiment_config)
result = config.subexperiment_evaluator.execute() result = config.subexperiment_evaluator.execute()
if result: if result == 1:
i+=1
else:
j+=1 j+=1
else:
i+=1 i+=1
print(f"Executed {subexperiment_counter} experiments in total.")
\ No newline at end of file
def execute():
return
\ No newline at end of file
import os
def execute():
with open("last_exp_result.txt", "r") as file:
result = file.read()
return int(result) == 1
{
"test_step_strategy.py": true
}
\ No newline at end of file
...@@ -4,8 +4,11 @@ import sys ...@@ -4,8 +4,11 @@ import sys
import os import os
from strategies.config import ExperimentConfig from strategies.config import ExperimentConfig
import strategies.strategies.default_strategy as default_strategy import strategies.strategies.default_strategy as default_strategy
import strategies.strategies.step_strategy as step_strategy
from strategies.experiment_execution import ExperimentExecutor from strategies.experiment_execution import ExperimentExecutor
import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
import strategies.subexperiment_evaluation.noop_subexperiment_evaluator as noop_subexperiment_evaluator
import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator
uc=sys.argv[1] uc=sys.argv[1]
dim_values=sys.argv[2].split(',') dim_values=sys.argv[2].split(',')
...@@ -17,12 +20,14 @@ kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[ ...@@ -17,12 +20,14 @@ kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[
execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5 execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5
benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default" benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default"
print("Chosen benchmarking strategy: "+benchmark_strategy) print(f"Chosen benchmarking strategy: {benchmark_strategy}")
print("Going to execute " + str(len(dim_values)*len(replicas)) + " subexperiments in total..")
if benchmark_strategy == "step":
print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..")
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, step_strategy, subexperiment_executor, subexperiment_evaluator)
else:
print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..")
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor, noop_subexperiment_evaluator)
# todo set noop evaluator for default strategy
experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor)
executor = ExperimentExecutor(experiment_config) executor = ExperimentExecutor(experiment_config)
executor.execute() executor.execute()
\ No newline at end of file
# todo add step strategy
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment