From 1998c4e16a1cf57e510fb4cf37a8d91e80ed8b0a Mon Sep 17 00:00:00 2001
From: Simon Ehrenstein <simon.ehrenstein@gmail.com>
Date: Sun, 9 Aug 2020 22:47:32 +0200
Subject: [PATCH] Add step strategy heuristic

---
 execution/lag_analysis.py                     | 18 +++++++++----
 .../strategies/strategies/step_strategy.py    | 25 +++++++++----------
 .../noop_subexperiment_evaluator.py           |  2 ++
 .../subexperiment_evaluator.py                |  6 +++++
 .../tests/.cache/v/cache/lastfailed           |  3 +++
 execution/theodolite.py                       | 19 ++++++++------
 6 files changed, 48 insertions(+), 25 deletions(-)
 create mode 100644 execution/strategies/subexperiment_evaluation/noop_subexperiment_evaluator.py
 create mode 100644 execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py
 create mode 100644 execution/strategies/tests/.cache/v/cache/lastfailed

diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py
index 23e3d5f6c..814f8d105 100644
--- a/execution/lag_analysis.py
+++ b/execution/lag_analysis.py
@@ -5,7 +5,6 @@ from datetime import datetime, timedelta, timezone
 import pandas as pd
 import matplotlib.pyplot as plt
 import csv
-
 #
 exp_id =  sys.argv[1]
 benchmark = sys.argv[2]
@@ -14,6 +13,8 @@ instances = sys.argv[4]
 execution_minutes = int(sys.argv[5])
 time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
 
+prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
+
 #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
 
 now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
@@ -27,7 +28,7 @@ start = now - timedelta(minutes=execution_minutes)
 #print(start.isoformat().replace('+00:00', 'Z'))
 #print(end.isoformat().replace('+00:00', 'Z'))
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+response = requests.get(prometheus_query_path, params={
     #'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
     'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
     'start': start.isoformat(),
@@ -87,7 +88,7 @@ df.to_csv(f"{filename}_values.csv")
 
 # Load total lag count
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+response = requests.get(prometheus_query_path, params={
     'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
     'start': start.isoformat(),
     'end': end.isoformat(),
@@ -108,10 +109,17 @@ df = pd.DataFrame(d)
 
 df.to_csv(f"{filename}_totallag.csv")
 
+# save whether the subexperiment was successful or not, meaning whether the consumer lag was above some threshhold or not
+# Assumption: Due to fluctuations within the record lag measurements, it is sufficient to analyze the second half of measurements.
+second_half = list(map(lambda x: x['value'], d[len(d)//2:]))
+avg_lag = sum(second_half) / len(second_half)
+with open(r"last_exp_result.txt", "w+") as file:
+    success = 0 if avg_lag > 1000 else 1
+    file.write(str(success))
 
 # Load partition count
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+response = requests.get(prometheus_query_path, params={
     'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
     'start': start.isoformat(),
     'end': end.isoformat(),
@@ -135,7 +143,7 @@ df.to_csv(f"{filename}_partitions.csv")
 
 # Load instances count
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+response = requests.get(prometheus_query_path, params={
     'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
     'start': start.isoformat(),
     'end': end.isoformat(),
diff --git a/execution/strategies/strategies/step_strategy.py b/execution/strategies/strategies/step_strategy.py
index d0e122554..9abad0660 100644
--- a/execution/strategies/strategies/step_strategy.py
+++ b/execution/strategies/strategies/step_strategy.py
@@ -3,23 +3,22 @@
 import os
 from .config import SubexperimentConfig
 
-
 def execute(config):
     subexperiment_counter=0
     subexperiments_total=len(config.dim_values)*len(config.replicas)
     i=0
     j=0
-    while i in range(len(config.dim_values)):
-        while j in range(len(config.replicas)):
-            subexperiment_counter+=1
-            print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[i]} and {config.replicas[j]} replicas.")
+    while i < len(config.replicas) and j < len(config.dim_values):
+        subexperiment_counter+=1
+        print(f"Run subexperiment {subexperiment_counter}/{subexperiments_total} with dimension value {config.dim_values[j]} and {config.replicas[i]} replicas.")
+
+        subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[j], config.replicas[i], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
 
-            subexperiment_config = SubexperimentConfig(config.use_case, subexperiment_counter, config.dim_values[i], config.replicas[j], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes)
+        config.subexperiment_executor.execute(subexperiment_config)
+        result = config.subexperiment_evaluator.execute()
+        if result == 1:
+            j+=1
+        else:
+            i+=1
 
-            config.subexperiment_executor.execute(subexperiment_config)
-            result = config.subexperiment_evaluator.execute()
-            if result:
-                i+=1
-            else:
-                j+=1
-        i+=1
\ No newline at end of file
+    print(f"Executed {subexperiment_counter} experiments in total.")
\ No newline at end of file
diff --git a/execution/strategies/subexperiment_evaluation/noop_subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/noop_subexperiment_evaluator.py
new file mode 100644
index 000000000..e38974266
--- /dev/null
+++ b/execution/strategies/subexperiment_evaluation/noop_subexperiment_evaluator.py
@@ -0,0 +1,2 @@
+def execute():
+    return
\ No newline at end of file
diff --git a/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py
new file mode 100644
index 000000000..021b020f9
--- /dev/null
+++ b/execution/strategies/subexperiment_evaluation/subexperiment_evaluator.py
@@ -0,0 +1,6 @@
+import os
+
+def execute():
+    with open("last_exp_result.txt", "r") as file:
+        result = file.read()
+        return int(result) == 1
diff --git a/execution/strategies/tests/.cache/v/cache/lastfailed b/execution/strategies/tests/.cache/v/cache/lastfailed
new file mode 100644
index 000000000..41d320cbb
--- /dev/null
+++ b/execution/strategies/tests/.cache/v/cache/lastfailed
@@ -0,0 +1,3 @@
+{
+  "test_step_strategy.py": true
+}
\ No newline at end of file
diff --git a/execution/theodolite.py b/execution/theodolite.py
index ce1c23810..070a10f15 100755
--- a/execution/theodolite.py
+++ b/execution/theodolite.py
@@ -4,8 +4,11 @@ import sys
 import os
 from strategies.config import ExperimentConfig
 import strategies.strategies.default_strategy as default_strategy
+import strategies.strategies.step_strategy as step_strategy
 from strategies.experiment_execution import ExperimentExecutor
 import strategies.subexperiment_execution.subexperiment_executor as subexperiment_executor
+import strategies.subexperiment_evaluation.noop_subexperiment_evaluator as noop_subexperiment_evaluator
+import strategies.subexperiment_evaluation.subexperiment_evaluator as subexperiment_evaluator
 
 uc=sys.argv[1]
 dim_values=sys.argv[2].split(',')
@@ -17,12 +20,14 @@ kafka_streams_commit_interval_ms=sys.argv[7] if len(sys.argv) >= 8 and sys.argv[
 execution_minutes=sys.argv[8] if len(sys.argv) >= 9 and sys.argv[8] else 5
 benchmark_strategy=sys.argv[9] if len(sys.argv) >= 10 and sys.argv[9] else "default"
 
-print("Chosen benchmarking strategy: "+benchmark_strategy)
-print("Going to execute " + str(len(dim_values)*len(replicas)) + " subexperiments in total..")
+print(f"Chosen benchmarking strategy: {benchmark_strategy}")
 
-# todo set noop evaluator for default strategy
-experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor)
-executor = ExperimentExecutor(experiment_config)
-executor.execute()
+if benchmark_strategy == "step":
+    print(f"Going to execute at most {len(dim_values)+len(replicas)-1} subexperiments in total..")
+    experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, step_strategy, subexperiment_executor, subexperiment_evaluator)
+else:
+    print(f"Going to execute {len(dim_values)*len(replicas)} subexperiments in total..")
+    experiment_config = ExperimentConfig(uc, dim_values, replicas, partitions, cpu_limit, memory_limit, kafka_streams_commit_interval_ms, execution_minutes, default_strategy, subexperiment_executor, noop_subexperiment_evaluator)
 
-# todo add step strategy
+executor = ExperimentExecutor(experiment_config)
+executor.execute()
\ No newline at end of file
-- 
GitLab