Skip to content
Snippets Groups Projects
Commit 9995ef0f authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'rewrite-execution' into new-execution

parents 854a9f8a c3531e2c
No related branches found
No related tags found
No related merge requests found
Showing
with 1149 additions and 0 deletions
...@@ -28,4 +28,5 @@ tmp/ ...@@ -28,4 +28,5 @@ tmp/
*.iml *.iml
*.iws *.iws
# Python Venv
.venv .venv
#!/bin/bash
./run_loop.sh 1 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5" 40 #6*5=3Std
sleep 5m
./run_loop.sh 2 "6 7 8 9" "1 2 3 4 6 8 10 12 14 16 18 20" 40 #4*12=5Std
sleep 5m
./run_loop.sh 3 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5 6" 40 #6*6=3.5Std
sleep 5m
./run_loop.sh 4 "25000 50000 75000 100000 125000 150000" "1 2 4 6 8 10 12 14 16 18 20 30 40 50 60 70 80 90" 40 #6*18=11Std
sleep 5m
./run_loop.sh 1 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5" 400 #6*5=3Std
sleep 5m
./run_loop.sh 2 "6 7 8 9" "1 2 3 4 6 8 10 12 14 16 18 20" 400 #4*12=5Std
sleep 5m
./run_loop.sh 3 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5 6" 400 #6*6=3.5Std
sleep 5m
./run_loop.sh 4 "25000 50000 75000 100000 125000 150000" "1 2 4 6 8 10 12 14 16 18 20 30 40 50 60 70 80 90" 400 #6*18=11Std
sleep 5m
./run_loop.sh 4 "150000" "100 110 120 130 140 150 160 17 18 190 200" 400 #6*18=11Std
sleep 5m
# For commit interval evaluation
./run_loop.sh 4 "5000 10000 15000 20000 25000 30000" "1 2 3 4 5 6 7 8 9 10 11 12 13 14 15" 160
\ No newline at end of file
#!/bin/bash
#./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 40 #3Std
./run_loop.sh 1 "200000 250000 300000" "1 2 3 4 5" 40 1000m 4Gi 100 5 #1.5Std
sleep 1m
#./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 400 #3Std
./run_loop.sh 1 "200000 250000 300000" "1 2 3 4 5" 400 1000m 4Gi 100 5 #1.5Std
sleep 1m
#./run_loop.sh 3 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 40 #6 Std
./run_loop.sh 3 "200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 40 1000m 4Gi 100 5 #3 Std
sleep 1m
#./run_loop.sh 3 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 400 #6 Std
./run_loop.sh 3 "200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 400 1000m 4Gi 100 5 #3 Std
sleep 1m
./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 40 500m 2Gi 100 5 #3Std
0
# Test Partition count of 100
./run_loop.sh 1 "10000 50000 100000 200000" "1, 2, 4, 8, 12, 16, 20" 100
\ No newline at end of file
import sys
import os
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
#
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
execution_minutes = 5
time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0))
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
now = now_local - timedelta(milliseconds=time_diff_ms)
print(f"Now Local: {now_local}")
print(f"Now Used: {now}")
end = now
start = now - timedelta(minutes=execution_minutes)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
#'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'query': "sum by(group, topic)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
#response
#print(response.request.path_url)
#response.content
results = response.json()['data']['result']
d = []
for result in results:
#print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
#print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
# Do some analysis
input = df.loc[df['topic'] == "input"]
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
print(linear_regressor.coef_)
#print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_]
print(fields)
with open(r'results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv")
# Load total lag count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
'query': "sum by(group)(kafka_consumergroup_group_lag > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
#print(result['metric']['topic'])
group = result['metric']['group']
for value in result['values']:
#print(value)
d.append({'group': group, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_totallag.csv")
# Load partition count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
'query': "count by(group,topic)(kafka_consumergroup_group_offset > 0)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
#print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
#print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_partitions.csv")
# Load instances count
response = requests.get('http://kube1.se.internal:32529/api/v1/query_range', params={
'query': "count(count (kafka_consumer_consumer_fetch_manager_metrics_records_lag) by(pod))",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
results = response.json()['data']['result']
d = []
for result in results:
for value in result['values']:
#print(value)
d.append({'timestamp': int(value[0]), 'value': int(value[1])})
df = pd.DataFrame(d)
df.to_csv(f"{filename}_instances.csv")
\ No newline at end of file
#!/bin/bash
UC=$1
IFS=', ' read -r -a DIM_VALUES <<< "$2"
IFS=', ' read -r -a REPLICAS <<< "$3"
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
# Get and increment counter
EXP_ID=$(cat exp_counter.txt)
echo $((EXP_ID+1)) > exp_counter.txt
# Store meta information
IFS=$', '; echo \
"UC=$UC
DIM_VALUES=${DIM_VALUES[*]}
REPLICAS=${REPLICAS[*]}
PARTITIONS=$PARTITIONS
" >> "exp${EXP_ID}_uc${UC}_meta.txt"
SUBEXPERIMENTS=$((${#DIM_VALUES[@]} * ${#REPLICAS[@]}))
SUBEXPERIMENT_COUNTER=0
echo "Going to execute $SUBEXPERIMENTS subexperiments in total..."
for DIM_VALUE in "${DIM_VALUES[@]}"
do
for REPLICA in "${REPLICAS[@]}"
do
SUBEXPERIMENT_COUNTER=$((SUBEXPERIMENT_COUNTER+1))
echo "Run subexperiment $SUBEXPERIMENT_COUNTER/$SUBEXPERIMENTS with config: $DIM_VALUE $REPLICA"
./run_uc$UC-new.sh $EXP_ID $DIM_VALUE $REPLICA $PARTITIONS $CPU_LIMIT $MEMORY_LIMIT $KAFKA_STREAMS_COMMIT_INTERVAL_MS $EXECUTION_MINUTES
sleep 10s
done
done
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
echo "Print topics:"
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
#kubectl apply -f uc3-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc1-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
#kubectl delete -f uc1-workload-generator/deployment.yaml
#sed "s/{{INSTANCES}}/1/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
#sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
#kubectl delete -f uc1-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_NESTED_GROUPS=$DIM_VALUE
sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g" uc2-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
#kubectl apply -f uc2-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc2-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc2-workload-generator/deployment.yaml
#kubectl delete -f uc2-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
WORKLOAD_GENERATOR_YAML=$(sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc3-workload-generator/deployment.yaml)
echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
#kubectl apply -f uc3-application/aggregation-deployment.yaml
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc3-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
#kubectl delete -f uc3-workload-generator/deployment.yaml
#sed "s/{{INSTANCES}}/1/g" uc3-workload-generator/deployment.yaml | kubectl delete -f -
echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f -
#kubectl delete -f uc1-application/aggregation-deployment.yaml
#sed "s/{{CPU_LIMIT}}/1000m/g; s/{{MEMORY_LIMIT}}/4Gi/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/100/g" uc3-application/aggregation-deployment.yaml | kubectl delete -f -
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
#NUM_SENSORS=xy
sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g" uc4-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
#AGGREGATION_DURATION_DAYS=$DIM_VALUE
#kubectl apply -f uc4-application/aggregation-deployment.yaml
#sed "s/{{AGGREGATION_DURATION_DAYS}}/$AGGREGATION_DURATION_DAYS/g" uc4-application/aggregation-deployment.yaml | kubectl apply -f -
APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc4-application/aggregation-deployment.yaml)
echo "$APPLICATION_YAML" | kubectl apply -f -
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep ${EXECUTION_MINUTES}m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc4-workload-generator/deployment.yaml
#kubectl delete -f uc4-application/aggregation-deployment.yaml
echo "$APPLICATION_YAML" | kubectl delete -f -
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
%% Cell type:code id: tags:
```
import os
import pandas as pd
from functools import reduce
import matplotlib.pyplot as plt
```
%% Cell type:code id: tags:
```
directory = '../results-inst'
experiments = {
'exp1003': 'exp1003',
'exp1025': 'exp1025',
}
```
%% Cell type:code id: tags:
```
dataframes = [pd.read_csv(os.path.join(directory, f'{v}_min-suitable-instances.csv')).set_index('dim_value').rename(columns={"instances": k}) for k, v in experiments.items()]
df = reduce(lambda df1,df2: df1.join(df2,how='outer'), dataframes)
df
```
%% Cell type:code id: tags:
```
plt.style.use('ggplot')
plt.rcParams['axes.facecolor']='w'
plt.rcParams['axes.edgecolor']='555555'
#plt.rcParams['ytick.color']='black'
plt.rcParams['grid.color']='dddddd'
plt.rcParams['axes.spines.top']='false'
plt.rcParams['axes.spines.right']='false'
plt.rcParams['legend.frameon']='true'
plt.rcParams['legend.framealpha']='1'
plt.rcParams['legend.edgecolor']='1'
plt.rcParams['legend.borderpad']='1'
plt.figure()
ax = df.plot(kind='line', marker='o')
#ax = df.plot(kind='line',x='dim_value', legend=False, use_index=True)
ax.set_ylabel('instances')
ax.set_xlabel('data sources')
ax.set_ylim(ymin=0)
#ax.set_xlim(xmin=0)
```
%% Cell type:code id: tags:
```
```
%% Cell type:code id: tags:
```
print("hello")
```
%% Cell type:code id: tags:
```
import os
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
```
%% Cell type:code id: tags:
```
os.getcwd()
```
%% Cell type:code id: tags:
```
exp_id = 1003
warmup_sec = 60
warmup_partitions_sec = 120
threshold = 2000 #slope
directory = '../results'
```
%% Cell type:code id: tags:outputPrepend,outputPrepend
```
#exp_id = 35
#os.chdir("./results-final")
raw_runs = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("totallag.csv")]
for filename in filenames:
#print(filename)
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
#print(linear_regressor.coef_)
row = {'dim_value': int(dim_value), 'instances': int(instances), 'trend_slope': trend_slope}
#print(row)
raw_runs.append(row)
lags = pd.DataFrame(raw_runs)
```
%% Cell type:code id: tags:
```
lags.head()
```
%% Cell type:code id: tags:
```
raw_partitions = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("partitions.csv")]
for filename in filenames:
#print(filename)
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
input = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
input = input.loc[input['topic'] >= 'input']
mean = input['value'].mean()
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
row = {'dim_value': int(dim_value), 'instances': int(instances), 'partitions': mean}
#print(row)
raw_partitions.append(row)
partitions = pd.DataFrame(raw_partitions)
#runs = lags.join(partitions.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])
```
%% Cell type:code id: tags:
```
raw_obs_instances = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("instances.csv")]
for filename in filenames:
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
if df.empty:
continue
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
input = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
#input = input.loc[input['topic'] >= 'input']
#mean = input['value'].mean()
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
#row = {'dim_value': int(dim_value), 'instances': int(instances), 'obs_instances': mean}
#print(row)
raw_obs_instances.append(row)
obs_instances = pd.DataFrame(raw_obs_instances)
obs_instances.head()
```
%% Cell type:code id: tags:
```
runs = lags
#runs = lags.join(partitions.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])#.join(obs_instances.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])
#runs["failed"] = runs.apply(lambda row: (abs(row['instances'] - row['obs_instances']) / row['instances']) > 0.1, axis=1)
#runs.loc[runs['failed']==True]
```
%% Cell type:code id: tags:
```
#threshold = 1000
# Set to true if the trend line has a slope less than
runs["suitable"] = runs.apply(lambda row: row['trend_slope'] < threshold, axis=1)
runs.columns = runs.columns.str.strip()
runs.sort_values(by=["dim_value", "instances"])
```
%% Cell type:code id: tags:
```
filtered = runs[runs.apply(lambda x: x['suitable'], axis=1)]
grouped = filtered.groupby(['dim_value'])['instances'].min()
min_suitable_instances = grouped.to_frame().reset_index()
min_suitable_instances
```
%% Cell type:code id: tags:
```
min_suitable_instances.to_csv(f'../results-inst/exp{exp_id}_min-suitable-instances.csv', index=False)
```
%% Cell type:code id: tags:
```
min_suitable_instances.plot(kind='line',x='dim_value',y='instances')
# min_suitable_instances.plot(kind='line',x='dim_value',y='instances', logy=True)
plt.show()
```
%% Cell type:code id: tags:
```
```
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc1-application
image: "soerenhenning/uc1-app:latest"
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
resources:
limits:
memory: "{{MEMORY_LIMIT}}"
cpu: "{{CPU_LIMIT}}"
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-aggregation
labels:
app: titan-ccp-aggregation
spec:
#type: NodePort
selector:
app: titan-ccp-aggregation
ports:
- name: http
port: 80
targetPort: 80
protocol: TCP
- name: metrics
port: 5556
apiVersion: v1
kind: ConfigMap
metadata:
name: aggregation-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app: titan-ccp-aggregation
appScope: titan-ccp
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
endpoints:
- port: metrics
interval: 10s
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: titan-ccp-load-generator
spec:
selector:
matchLabels:
app: titan-ccp-load-generator
serviceName: titan-ccp-load-generator
replicas: {{INSTANCES}}
template:
metadata:
labels:
app: titan-ccp-load-generator
spec:
terminationGracePeriodSeconds: 0
containers:
- name: workload-generator
image: soerenhenning/uc1-wg:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: NUM_SENSORS
value: "{{NUM_SENSORS}}"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: INSTANCES
value: "{{INSTANCES}}"
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: load-generator-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc2-application
image: "benediktwetzel/uc2-app:latest"
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
resources:
limits:
memory: "{{MEMORY_LIMIT}}"
cpu: "{{CPU_LIMIT}}"
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment