Skip to content
Snippets Groups Projects
Commit 0814df0f authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/theodolitePython' into 'master'

Integerate theodolite and run uc python scripts

See merge request !42
parents 85c3ebde 240883bd
No related branches found
No related tags found
No related merge requests found
Showing
with 1035 additions and 201 deletions
...@@ -5,17 +5,12 @@ from datetime import datetime, timedelta, timezone ...@@ -5,17 +5,12 @@ from datetime import datetime, timedelta, timezone
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import csv import csv
# import logging
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
prometheus_query_path = 'http://kube1.se.internal:32529/api/v1/query_range'
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s def main(exp_id, benchmark, dim_value, instances, execution_minutes, prometheus_base_url = 'http://kube1.se.internal:32529'):
print("Main")
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0) now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms) now = now_local - timedelta(milliseconds=time_diff_ms)
...@@ -28,13 +23,12 @@ start = now - timedelta(minutes=execution_minutes) ...@@ -28,13 +23,12 @@ start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z')) #print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z')) #print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
# 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)", # 'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
'step': '5s'}) 'step': '5s'})
# response # response
# print(response.request.path_url) # print(response.request.path_url)
# response.content # response.content
...@@ -47,7 +41,8 @@ for result in results: ...@@ -47,7 +41,8 @@ for result in results:
topic = result['metric']['topic'] topic = result['metric']['topic']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
...@@ -60,8 +55,10 @@ input = df.loc[df['topic'] == "input"] ...@@ -60,8 +55,10 @@ input = df.loc[df['topic'] == "input"]
from sklearn.linear_model import LinearRegression from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column X = input.iloc[:, 1].values.reshape(-1, 1)
# -1 means that calculate the dimension of rows, but have 1 column
Y = input.iloc[:, 2].values.reshape(-1, 1)
linear_regressor = LinearRegression() # create object for the class linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions Y_pred = linear_regressor.predict(X) # make predictions
...@@ -70,7 +67,8 @@ print(linear_regressor.coef_) ...@@ -70,7 +67,8 @@ print(linear_regressor.coef_)
# print(Y_pred) # print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_] fields = [exp_id, datetime.now(), benchmark, dim_value,
instances, linear_regressor.coef_]
print(fields) print(fields)
with open(r'results.csv', 'a') as f: with open(r'results.csv', 'a') as f:
writer = csv.writer(f) writer = csv.writer(f)
...@@ -85,10 +83,9 @@ plt.savefig(f"{filename}_plot.png") ...@@ -85,10 +83,9 @@ plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv") df.to_csv(f"{filename}_values.csv")
# Load total lag count # Load total lag count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)", 'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -103,7 +100,8 @@ for result in results: ...@@ -103,7 +100,8 @@ for result in results:
group = result['metric']['group'] group = result['metric']['group']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
...@@ -111,7 +109,7 @@ df.to_csv(f"{filename}_totallag.csv") ...@@ -111,7 +109,7 @@ df.to_csv(f"{filename}_totallag.csv")
# Load partition count # Load partition count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)", 'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -126,16 +124,16 @@ for result in results: ...@@ -126,16 +124,16 @@ for result in results:
topic = result['metric']['topic'] topic = result['metric']['topic']
for value in result['values']: for value in result['values']:
# print(value) # print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) d.append({'topic': topic, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d) df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv") df.to_csv(f"{filename}_partitions.csv")
# Load instances count # Load instances count
response = requests.get(prometheus_query_path, params={ response = requests.get(prometheus_base_url + '/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))", 'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(), 'start': start.isoformat(),
'end': end.isoformat(), 'end': end.isoformat(),
...@@ -153,3 +151,16 @@ for result in results: ...@@ -153,3 +151,16 @@ for result in results:
df = pd.DataFrame(d) df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv") df.to_csv(f"{filename}_instances.csv")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
# Load arguments
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = int(sys.argv[5])
main(exp_id, benchmark, dim_value, instances, execution_minutes)
...@@ -2,3 +2,7 @@ matplotlib==3.2.0 ...@@ -2,3 +2,7 @@ matplotlib==3.2.0
pandas==1.0.1 pandas==1.0.1
requests==2.23.0 requests==2.23.0
scikit-learn==0.22.2.post1 scikit-learn==0.22.2.post1
# For run_uc.py
kubernetes==11.0.0
confuse==1.1.0
This diff is collapsed.
...@@ -29,38 +29,59 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,38 +29,59 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc1-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc1-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc1-application` cat <<EOF >uc-application/overlay/uc1-application/set_paramters.yaml
kubectl apply -f uc1-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc1-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc1-application/service-monitor.yaml metadata:
#kubectl apply -f uc1-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc1-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc1-application
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#kubectl delete -f uc1-workload-generator/deployment.yaml kubectl delete -k uc-workload-generator/overlay/uc1-workload-generator
#sed "s/{{INSTANCES}}/1/g" uc1-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-application/overlay/uc1-application
#sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc1-application/aggregation-service.yaml
kubectl delete -f uc1-application/jmx-configmap.yaml
kubectl delete -f uc1-application/service-monitor.yaml
#kubectl delete -f uc1-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
......
...@@ -30,36 +30,63 @@ WL_MAX_RECORDS=150000 ...@@ -30,36 +30,63 @@ WL_MAX_RECORDS=150000
APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS)) APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS))
WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc2-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc2-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "4"
- name: HIERARCHY
value: "full"
- name: NUM_NESTED_GROUPS
value: "$NUM_NESTED_GROUPS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc2-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc2-application` cat <<EOF >uc-application/overlay/uc2-application/set_paramters.yaml
kubectl apply -f uc2-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc2-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc2-application/service-monitor.yaml metadata:
#kubectl apply -f uc2-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc2-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc2-application
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#sed "s/{{INSTANCES}}/1/g" uc2-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-workload-generator/overlay/uc2-workload-generator
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - kubectl delete -k uc-application/overlay/uc2-application
kubectl delete -f uc2-application/aggregation-service.yaml
kubectl delete -f uc2-application/jmx-configmap.yaml
kubectl delete -f uc2-application/service-monitor.yaml
#kubectl delete -f uc2-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
......
...@@ -29,40 +29,61 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,40 +29,61 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc3-workload-generator/deployment.yaml) cat <<EOF >uc-workload-generator/overlay/uc3-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc3-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
# When not using `sed` anymore, use `kubectl apply -f uc3-application` cat <<EOF >uc-application/overlay/uc3-application/set_paramters.yaml
kubectl apply -f uc3-application/aggregation-service.yaml apiVersion: apps/v1
kubectl apply -f uc3-application/jmx-configmap.yaml kind: Deployment
kubectl apply -f uc3-application/service-monitor.yaml metadata:
#kubectl apply -f uc3-application/aggregation-deployment.yaml name: titan-ccp-aggregation
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc3-application/aggregation-deployment.yaml) spec:
echo "$APPLICATION_YAML" | kubectl apply -f - replicas: $REPLICAS
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc3-application
kubectl scale deployment uc3-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#kubectl delete -f uc3-workload-generator/deployment.yaml kubectl delete -k uc-workload-generator/overlay/uc3-workload-generator
#sed "s/{{INSTANCES}}/1/g" uc3-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-application/overlay/uc3-application
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
kubectl delete -f uc3-application/aggregation-service.yaml
kubectl delete -f uc3-application/jmx-configmap.yaml
kubectl delete -f uc3-application/service-monitor.yaml
#kubectl delete -f uc3-application/aggregation-deployment.yaml
#sed "s/{{CPU_LIMIT}}/1000m/g; s/{{MEMORY_LIMIT}}/4Gi/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/100/g" uc3-application/aggregation-deployment.yaml | kubectl delete -f -
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
...@@ -29,39 +29,60 @@ NUM_SENSORS=$DIM_VALUE ...@@ -29,39 +29,60 @@ NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000 WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS)) WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc4-workload-generator/deployment.yaml) cat <<EOF >uuc-workload-generator/overlay/uc4-workload-generator/set_paramters.yaml
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc4-workload-generator
# Start application # Start application
REPLICAS=$INSTANCES REPLICAS=$INSTANCES
#AGGREGATION_DURATION_DAYS=$DIM_VALUE cat <<EOF >uc-application/overlay/uc4-application/set_paramters.yaml
# When not using `sed` anymore, use `kubectl apply -f uc4-application` apiVersion: apps/v1
kubectl apply -f uc4-application/aggregation-service.yaml kind: Deployment
kubectl apply -f uc4-application/jmx-configmap.yaml metadata:
kubectl apply -f uc4-application/service-monitor.yaml name: titan-ccp-aggregation
#kubectl apply -f uc4-application/aggregation-deployment.yaml spec:
#sed "s/{{AGGREGATION_DURATION_DAYS}}/$AGGREGATION_DURATION_DAYS/g" uc4-application/aggregation-deployment.yaml | kubectl apply -f - replicas: $REPLICAS
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc4-application/aggregation-deployment.yaml) template:
echo "$APPLICATION_YAML" | kubectl apply -f - spec:
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc4-application
kubectl scale deployment uc4-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time # Execute for certain time
sleep ${EXECUTION_MINUTES}m sleep $(($EXECUTION_MINUTES * 60))
# Run eval script # Run eval script
source ../.venv/bin/activate source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate deactivate
# Stop wl and app # Stop workload generator and app
#sed "s/{{INSTANCES}}/1/g" uc4-workload-generator/deployment.yaml | kubectl delete -f - kubectl delete -k uc-workload-generator/overlay/uc4-workload-generator
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - kubectl delete -k uc-application/overlay/uc4-application
kubectl delete -f uc4-application/aggregation-service.yaml
kubectl delete -f uc4-application/jmx-configmap.yaml
kubectl delete -f uc4-application/service-monitor.yaml
#kubectl delete -f uc4-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka # Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
......
# Wrapper that makes the execution method of a subexperiment interchangable. # Wrapper that makes the execution method of a subexperiment interchangable.
import os import os
import run_uc
dirname = os.path.dirname(__file__)
os.chdir(dirname+"/../../")
def execute(subexperiment_config): def execute(subexperiment_config):
os.system(f"./run_uc{subexperiment_config.use_case}.sh {subexperiment_config.exp_id} {subexperiment_config.dim_value} {subexperiment_config.replicas} {subexperiment_config.partitions} {subexperiment_config.cpu_limit} {subexperiment_config.memory_limit} {subexperiment_config.kafka_streams_commit_interval_ms} {subexperiment_config.execution_minutes}") run_uc.main(
\ No newline at end of file exp_id=subexperiment_config.exp_id,
uc_id=subexperiment_config.use_case,
dim_value=int(subexperiment_config.dim_value),
instances=int(subexperiment_config.replicas),
partitions=subexperiment_config.partitions,
cpu_limit=subexperiment_config.cpu_limit,
memory_limit=subexperiment_config.memory_limit,
commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms,
execution_minutes=int(subexperiment_config.execution_minutes),
reset=False,
reset_only=False)
...@@ -14,24 +14,24 @@ spec: ...@@ -14,24 +14,24 @@ spec:
spec: spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc1-application - name: uc-application
image: "theodolite/theodolite-uc1-kstreams-app:latest" image: uc-app:latest
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: COMMIT_INTERVAL_MS
value: "100"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL - name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081" value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
resources: resources:
limits: limits:
memory: "{{MEMORY_LIMIT}}" memory: 4Gi
cpu: "{{CPU_LIMIT}}" cpu: 1000m
- name: prometheus-jmx-exporter - name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143" image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command: command:
......
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
commonLabels:
app: titan-ccp-aggregation
# Use all resources to compose them into one file
resources:
- aggregation-deployment.yaml
- aggregation-service.yaml
- service-monitor.yaml
- jmx-configmap.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc1-
images:
- name: uc-app
newName: theodolite/theodolite-uc1-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc2-
images:
- name: uc-app
newName: theodolite/theodolite-uc2-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc3-
images:
- name: uc-app
newName: theodolite/theodolite-uc3-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: 1
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "100"
resources:
limits:
memory: 4Gi
cpu: 1000m
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc4-
images:
- name: uc-app
newName: theodolite/theodolite-uc4-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment