From 2e9eaa37e86cc840b33a4f856127af64f309da90 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de>
Date: Mon, 20 Apr 2020 12:59:16 +0200
Subject: [PATCH] Load monitoring datat for number of instances and partitions

---
 execution/lag_analysis.py | 46 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py
index d8c2dc230..f2690ab13 100644
--- a/execution/lag_analysis.py
+++ b/execution/lag_analysis.py
@@ -81,3 +81,49 @@ plt.plot(X, Y_pred, color='red')
 plt.savefig(f"{filename}_plot.png")
 
 df.to_csv(f"{filename}_values.csv")
+
+
+# Load partition count
+
+response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+    'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
+    'start': start.isoformat(),
+    'end': end.isoformat(),
+    'step': '5s'})
+
+results = response.json()['data']['result']
+
+d = []
+
+for result in results:
+    #print(result['metric']['topic'])
+    topic = result['metric']['topic']
+    for value in result['values']:
+        #print(value)
+        d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
+
+df = pd.DataFrame(d)
+
+df.to_csv(f"{filename}_partitions.csv")
+
+
+# Load instances count
+
+response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
+    'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
+    'start': start.isoformat(),
+    'end': end.isoformat(),
+    'step': '5s'})
+
+results = response.json()['data']['result']
+
+d = []
+
+for result in results:
+    for value in result['values']:
+        #print(value)
+        d.append({'timestamp': int(value[0]), 'value': int(value[1])})
+
+df = pd.DataFrame(d)
+
+df.to_csv(f"{filename}_instances.csv")
\ No newline at end of file
-- 
GitLab