Skip to content
Snippets Groups Projects
Commit 042f4d03 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Move the lag analysis script into main functions

The load of the program arguments are moved to the main function
call and the rest is extracted into a main function, that get
the needed arguments passed.
parent e4a26bc8
No related branches found
No related tags found
2 merge requests!42Integerate theodolite and run uc python scripts,!24run UC as python implementation
...@@ -5,151 +5,164 @@ from datetime import datetime, timedelta, timezone ...@@ -5,151 +5,164 @@ from datetime import datetime, timedelta, timezone
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import csv import csv
import logging
#
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) # http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now = now_local - timedelta(milliseconds=time_diff_ms)
print(f"Now Local: {now_local}")
print(f"Now Used: {now}")
end = now now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
start = now - timedelta(minutes=execution_minutes) now = now_local - timedelta(milliseconds=time_diff_ms)
print(f"Now Local: {now_local}")
print(f"Now Used: {now}")
#print(start.isoformat().replace('+00:00', 'Z')) end = now
#print(end.isoformat().replace('+00:00', 'Z')) start = now - timedelta(minutes=execution_minutes)
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ #print(start.isoformat().replace('+00:00', 'Z'))
#'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", #print(end.isoformat().replace('+00:00', 'Z'))
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
#response response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
#print(response.request.path_url) # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
#response.content 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
results = response.json()['data']['result'] 'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
# response
# print(response.request.path_url)
# response.content
results = response.json()['data']['result']
d = [] d = []
for result in results: for result in results:
#print(result['metric']['topic']) # print(result['metric']['topic'])
topic = result['metric']['topic'] topic = result['metric']['topic']
for value in result['values']: for value in result['values']:
#print(value) # print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
# Do some analysis # Do some analysis
input = df.loc[df['topic'] == "input"] input = df.loc[df['topic'] == "input"]
#input.plot(kind='line',x='timestamp',y='value',color='red') # input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show() # plt.show()
from sklearn.linear_model import LinearRegression from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column X = input.iloc[:, 1].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor.fit(X, Y) # perform linear regression Y = input.iloc[:, 2].values.reshape(-1, 1)
Y_pred = linear_regressor.predict(X) # make predictions linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
print(linear_regressor.coef_) print(linear_regressor.coef_)
#print(Y_pred) # print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_] fields = [exp_id, datetime.now(), benchmark, dim_value,
print(fields) instances, linear_regressor.coef_]
with open(r'results.csv', 'a') as f: print(fields)
writer = csv.writer(f) with open(r'results.csv', 'a') as f:
writer.writerow(fields) writer = csv.writer(f)
writer.writerow(fields)
filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}" filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y) plt.plot(X, Y)
plt.plot(X, Y_pred, color='red') plt.plot(X, Y_pred, color='red')
plt.savefig(f"{filename}_plot.png") plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv") df.to_csv(f"{filename}_values.csv")
# Load total lag count
# Load total lag count response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ results = response.json()['data']['result']
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result'] d = []
d = [] for result in results:
# print(result['metric']['topic'])
group = result['metric']['group']
for value in result['values']:
# print(value)
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
for result in results: df = pd.DataFrame(d)
#print(result['metric']['topic'])
group = result['metric']['group']
for value in result['values']:
#print(value)
d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df.to_csv(f"{filename}_totallag.csv")
df.to_csv(f"{filename}_totallag.csv") # Load partition count
response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
# Load partition count results = response.json()['data']['result']
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ d = []
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result'] for result in results:
# print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
# print(value)
d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
d = [] df = pd.DataFrame(d)
for result in results: df.to_csv(f"{filename}_partitions.csv")
#print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
#print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) # Load instances count
df.to_csv(f"{filename}_partitions.csv") response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
# Load instances count d = []
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={ for result in results:
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", for value in result['values']:
'start': start.isoformat(), # print(value)
'end': end.isoformat(), d.append({'timestamp': int(value[0]), 'value': int(value[1])})
'step': '5s'})
results = response.json()['data']['result'] df = pd.DataFrame(d)
d = [] df.to_csv(f"{filename}_instances.csv")
for result in results:
for value in result['values']:
#print(value)
d.append({'timestamp': int(value[0]), 'value': int(value[1])})
df = pd.DataFrame(d) if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
df.to_csv(f"{filename}_instances.csv") # Load arguments
\ No newline at end of file exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment