diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index d8c2dc230e2813ccad34a30bcf8d1bff2db015ad..f2690ab13b072171020cd5b27a55d6260b9b9084 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -81,3 +81,49 @@ plt.plot(X, Y_pred, color='red') plt.savefig(f"{filename}_plot.png") df.to_csv(f"{filename}_values.csv") + + +# Load partition count + +response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ + 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", + 'start': start.isoformat(), + 'end': end.isoformat(), + 'step': '5s'}) + +results = response.json()['data']['result'] + +d = [] + +for result in results: + #print(result['metric']['topic']) + topic = result['metric']['topic'] + for value in result['values']: + #print(value) + d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) + +df = pd.DataFrame(d) + +df.to_csv(f"{filename}_partitions.csv") + + +# Load instances count + +response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ + 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", + 'start': start.isoformat(), + 'end': end.isoformat(), + 'step': '5s'}) + +results = response.json()['data']['result'] + +d = [] + +for result in results: + for value in result['values']: + #print(value) + d.append({'timestamp': int(value[0]), 'value': int(value[1])}) + +df = pd.DataFrame(d) + +df.to_csv(f"{filename}_instances.csv") \ No newline at end of file