From 042f4d03c98779b4f6c0b22b17a6539c7397be2a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Tue, 22 Sep 2020 17:00:50 +0200
Subject: [PATCH] Move the lag analysis script into main functions

The load of the program arguments are moved to the main function
call and the rest is extracted into a main function, that get
the needed arguments passed.
---
 execution/lag_analysis.py | 217 ++++++++++++++++++++------------------
 1 file changed, 115 insertions(+), 102 deletions(-)

diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py
index 23e3d5f6c..0678c29e9 100644
--- a/execution/lag_analysis.py
+++ b/execution/lag_analysis.py
@@ -5,151 +5,164 @@ from datetime import datetime, timedelta, timezone
 import pandas as pd
 import matplotlib.pyplot as plt
 import csv
+import logging
 
-#
-exp_id =  sys.argv[1]
-benchmark = sys.argv[2]
-dim_value = sys.argv[3]
-instances = sys.argv[4]
-execution_minutes = int(sys.argv[5])
-time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
 
-#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
+def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
+    print("Main")
+    time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
 
-now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
-now = now_local - timedelta(milliseconds=time_diff_ms)
-print(f"Now Local: {now_local}")
-print(f"Now Used: {now}")
+    # http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
 
-end = now
-start = now - timedelta(minutes=execution_minutes)
+    now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
+    now = now_local - timedelta(milliseconds=time_diff_ms)
+    print(f"Now Local: {now_local}")
+    print(f"Now Used: {now}")
 
-#print(start.isoformat().replace('+00:00', 'Z'))
-#print(end.isoformat().replace('+00:00', 'Z'))
+    end = now
+    start = now - timedelta(minutes=execution_minutes)
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
-    #'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
-    'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
-    'start': start.isoformat(),
-    'end': end.isoformat(),
-    'step': '5s'})
+    #print(start.isoformat().replace('+00:00', 'Z'))
+    #print(end.isoformat().replace('+00:00', 'Z'))
 
-#response
-#print(response.request.path_url)
-#response.content
-results = response.json()['data']['result']
+    response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
+        # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
+        'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
+        'start': start.isoformat(),
+        'end': end.isoformat(),
+        'step': '5s'})
+    # response
+    # print(response.request.path_url)
+    # response.content
+    results = response.json()['data']['result']
 
-d = []
+    d = []
 
-for result in results:
-    #print(result['metric']['topic'])
-    topic = result['metric']['topic']
-    for value in result['values']:
-        #print(value)
-        d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
+    for result in results:
+        # print(result['metric']['topic'])
+        topic = result['metric']['topic']
+        for value in result['values']:
+            # print(value)
+            d.append({'topic': topic, 'timestamp': int(
+                value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
 
-df = pd.DataFrame(d)
+    df = pd.DataFrame(d)
 
-# Do some analysis
+    # Do some analysis
 
-input = df.loc[df['topic'] == "input"]
+    input = df.loc[df['topic'] == "input"]
 
-#input.plot(kind='line',x='timestamp',y='value',color='red')
-#plt.show()
+    # input.plot(kind='line',x='timestamp',y='value',color='red')
+    # plt.show()
 
-from sklearn.linear_model import LinearRegression
+    from sklearn.linear_model import LinearRegression
 
-X = input.iloc[:, 1].values.reshape(-1, 1)  # values converts it into a numpy array
-Y = input.iloc[:, 2].values.reshape(-1, 1)  # -1 means that calculate the dimension of rows, but have 1 column
-linear_regressor = LinearRegression()  # create object for the class
-linear_regressor.fit(X, Y)  # perform linear regression
-Y_pred = linear_regressor.predict(X)  # make predictions
+    # values converts it into a numpy array
+    X = input.iloc[:, 1].values.reshape(-1, 1)
+    # -1 means that calculate the dimension of rows, but have 1 column
+    Y = input.iloc[:, 2].values.reshape(-1, 1)
+    linear_regressor = LinearRegression()  # create object for the class
+    linear_regressor.fit(X, Y)  # perform linear regression
+    Y_pred = linear_regressor.predict(X)  # make predictions
 
-print(linear_regressor.coef_)
+    print(linear_regressor.coef_)
 
-#print(Y_pred)
+    # print(Y_pred)
 
-fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_]
-print(fields)
-with open(r'results.csv', 'a') as f:
-    writer = csv.writer(f)
-    writer.writerow(fields)
+    fields = [exp_id, datetime.now(), benchmark, dim_value,
+              instances, linear_regressor.coef_]
+    print(fields)
+    with open(r'results.csv', 'a') as f:
+        writer = csv.writer(f)
+        writer.writerow(fields)
 
-filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
+    filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
 
-plt.plot(X, Y)
-plt.plot(X, Y_pred, color='red')
+    plt.plot(X, Y)
+    plt.plot(X, Y_pred, color='red')
 
-plt.savefig(f"{filename}_plot.png")
+    plt.savefig(f"{filename}_plot.png")
 
-df.to_csv(f"{filename}_values.csv")
+    df.to_csv(f"{filename}_values.csv")
 
+    # Load total lag count
 
-# Load total lag count
+    response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
+        'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
+        'start': start.isoformat(),
+        'end': end.isoformat(),
+        'step': '5s'})
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
-    'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
-    'start': start.isoformat(),
-    'end': end.isoformat(),
-    'step': '5s'})
+    results = response.json()['data']['result']
 
-results = response.json()['data']['result']
+    d = []
 
-d = []
+    for result in results:
+        # print(result['metric']['topic'])
+        group = result['metric']['group']
+        for value in result['values']:
+            # print(value)
+            d.append({'group': group, 'timestamp': int(
+                value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
 
-for result in results:
-    #print(result['metric']['topic'])
-    group = result['metric']['group']
-    for value in result['values']:
-        #print(value)
-        d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
+    df = pd.DataFrame(d)
 
-df = pd.DataFrame(d)
+    df.to_csv(f"{filename}_totallag.csv")
 
-df.to_csv(f"{filename}_totallag.csv")
+    # Load partition count
 
+    response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
+        'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
+        'start': start.isoformat(),
+        'end': end.isoformat(),
+        'step': '5s'})
 
-# Load partition count
+    results = response.json()['data']['result']
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
-    'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
-    'start': start.isoformat(),
-    'end': end.isoformat(),
-    'step': '5s'})
+    d = []
 
-results = response.json()['data']['result']
+    for result in results:
+        # print(result['metric']['topic'])
+        topic = result['metric']['topic']
+        for value in result['values']:
+            # print(value)
+            d.append({'topic': topic, 'timestamp': int(
+                value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
 
-d = []
+    df = pd.DataFrame(d)
 
-for result in results:
-    #print(result['metric']['topic'])
-    topic = result['metric']['topic']
-    for value in result['values']:
-        #print(value)
-        d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
+    df.to_csv(f"{filename}_partitions.csv")
 
-df = pd.DataFrame(d)
+    # Load instances count
 
-df.to_csv(f"{filename}_partitions.csv")
+    response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
+        'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
+        'start': start.isoformat(),
+        'end': end.isoformat(),
+        'step': '5s'})
 
+    results = response.json()['data']['result']
 
-# Load instances count
+    d = []
 
-response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
-    'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
-    'start': start.isoformat(),
-    'end': end.isoformat(),
-    'step': '5s'})
+    for result in results:
+        for value in result['values']:
+            # print(value)
+            d.append({'timestamp': int(value[0]), 'value': int(value[1])})
 
-results = response.json()['data']['result']
+    df = pd.DataFrame(d)
 
-d = []
+    df.to_csv(f"{filename}_instances.csv")
 
-for result in results:
-    for value in result['values']:
-        #print(value)
-        d.append({'timestamp': int(value[0]), 'value': int(value[1])})
 
-df = pd.DataFrame(d)
+if __name__ == '__main__':
+    logging.basicConfig(level=logging.INFO)
 
-df.to_csv(f"{filename}_instances.csv")
\ No newline at end of file
+    # Load arguments
+    exp_id = sys.argv[1]
+    benchmark = sys.argv[2]
+    dim_value = sys.argv[3]
+    instances = sys.argv[4]
+    execution_minutes = int(sys.argv[5])
+
+    main(exp_id, benchmark, dim_value, instances, execution_minutes)
-- 
GitLab