Skip to content
Snippets Groups Projects
Commit e41b0457 authored by Sören Henning's avatar Sören Henning
Browse files

Add experiment execution code

parent 161170a3
No related branches found
No related tags found
No related merge requests found
Showing
with 574 additions and 0 deletions
0
import sys
import requests
from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import csv
#
exp_id = sys.argv[1]
benchmark = sys.argv[2]
dim_value = sys.argv[3]
instances = sys.argv[4]
#http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s
now = datetime.utcnow().replace(tzinfo=timezone.utc).replace(microsecond=0)
end = now
start = now - timedelta(minutes=5)
#print(start.isoformat().replace('+00:00', 'Z'))
#print(end.isoformat().replace('+00:00', 'Z'))
response = requests.get('http://localhost:9090/api/v1/query_range', params={
'query': "sum by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)",
'start': start.isoformat(),
'end': end.isoformat(),
'step': '5s'})
#response
#print(response.request.path_url)
#response.content
results = response.json()['data']['result']
d = []
for result in results:
#print(result['metric']['topic'])
topic = result['metric']['topic']
for value in result['values']:
#print(value)
d.append({'topic': topic, 'timestamp': int(value[0]), 'value': int(value[1])})
df = pd.DataFrame(d)
input = df.loc[df['topic'] == "input"]
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
from sklearn.linear_model import LinearRegression
X = input.iloc[:, 1].values.reshape(-1, 1) # values converts it into a numpy array
Y = input.iloc[:, 2].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
print(linear_regressor.coef_)
#print(Y_pred)
fields=[exp_id, datetime.now(), benchmark, dim_value, instances, linear_regressor.coef_]
print(fields)
with open(r'results.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
filename = f"exp{exp_id}_{benchmark}_{dim_value}_{instances}"
plt.plot(X, Y)
plt.plot(X, Y_pred, color='red')
plt.savefig(f"{filename}_plot.png")
df.to_csv(f"{filename}_values.csv")
#!/bin/bash
UC=$1
IFS=', ' read -r -a DIM_VALUES <<< "$2"
IFS=', ' read -r -a REPLICAS <<< "$3"
PARTITIONS=$4
# Get and increment counter
EXP_ID=$(cat exp_counter.txt)
echo $((EXP_ID+1)) > exp_counter.txt
# Store meta information
IFS=$', '; echo \
"UC=$UC
DIM_VALUES=${DIM_VALUES[*]}
REPLICAS=${REPLICAS[*]}
PARTITIONS=$PARTITIONS
" >> "exp${EXP_ID}_uc${UC}_meta.txt"
for DIM_VALUE in "${DIM_VALUES[@]}"
do
for REPLICA in "${REPLICAS[@]}"
do
echo "Run $DIM_VALUE $REPLICA"
./run_uc$UC-new.sh $EXP_ID $DIM_VALUE $REPLICA $PARTITIONS
done
done
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=$4
# Start up Kafka
# TODO
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g" uc1-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
kubectl apply -f uc1-application/aggregation-deployment.yaml
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep 5m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc1-workload-generator/deployment.yaml
kubectl delete -f uc1-application/aggregation-deployment.yaml
# Delete topics instead of Kafka
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
sleep 30s # TODO check
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=$4
# Maybe start up Kafka
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_NESTED_GROUPS=$DIM_VALUE
sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g" uc2-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
kubectl apply -f uc2-application/aggregation-deployment.yaml
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep 5m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc2-workload-generator/deployment.yaml
kubectl delete -f uc2-application/aggregation-deployment.yaml
# Delete topics instead of Kafka
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
sleep 10s # TODO check
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=$4
# Maybe start up Kafka
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g" uc3-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
kubectl apply -f uc3-application/aggregation-deployment.yaml
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep 5m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc3-workload-generator/deployment.yaml
kubectl delete -f uc3-application/aggregation-deployment.yaml
# Delete topics instead of Kafka
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
sleep 10s # TODO check
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=$4
# Maybe start up Kafka
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g" uc4-workload-generator/deployment.yaml | kubectl apply -f -
# Start application
REPLICAS=$INSTANCES
kubectl apply -f uc4-application/aggregation-deployment.yaml
kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep 5m
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES
deactivate
# Stop wl and app
kubectl delete -f uc4-workload-generator/deployment.yaml
kubectl delete -f uc4-application/aggregation-deployment.yaml
# Delete topics instead of Kafka
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
sleep 10s # TODO check
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#TODO maybe delete schemas
#https://docs.confluent.io/current/schema-registry/schema-deletion-guidelines.html
#curl -X DELETE http://localhost:8081/subjects/Kafka-value
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc1-application
image: "benediktwetzel/uc1-app:latest"
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-aggregation
labels:
app: titan-ccp-aggregation
spec:
#type: NodePort
selector:
app: titan-ccp-aggregation
ports:
- name: http
port: 80
targetPort: 80
protocol: TCP
- name: metrics
port: 5556
apiVersion: v1
kind: ConfigMap
metadata:
name: aggregation-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app: titan-ccp-aggregation
appScope: titan-ccp
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
endpoints:
- port: metrics
interval: 10s
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
selector:
matchLabels:
app: titan-ccp-load-generator
replicas: 1
template:
metadata:
labels:
app: titan-ccp-load-generator
spec:
terminationGracePeriodSeconds: 0
containers:
- name: workload-generator
image: benediktwetzel/uc1-wg:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: NUM_SENSORS
value: "{{NUM_SENSORS}}"
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: load-generator-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc2-application
image: "benediktwetzel/uc2-app:latest"
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: titan-ccp-aggregation
labels:
app: titan-ccp-aggregation
spec:
#type: NodePort
selector:
app: titan-ccp-aggregation
ports:
- name: http
port: 80
targetPort: 80
protocol: TCP
- name: metrics
port: 5556
apiVersion: v1
kind: ConfigMap
metadata:
name: aggregation-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
app: titan-ccp-aggregation
appScope: titan-ccp
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
endpoints:
- port: metrics
interval: 10s
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
selector:
matchLabels:
app: titan-ccp-load-generator
replicas: 1
template:
metadata:
labels:
app: titan-ccp-load-generator
spec:
terminationGracePeriodSeconds: 0
containers:
- name: workload-generator
image: benediktwetzel/uc2-wg:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: HIERARCHY
value: "full"
- name: NUM_SENSORS
value: "4"
- name: NUM_NESTED_GROUPS
value: "{{NUM_NESTED_GROUPS}}"
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: load-generator-jmx-configmap
data:
jmx-kafka-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
selector:
matchLabels:
app: titan-ccp-aggregation
replicas: 1
template:
metadata:
labels:
app: titan-ccp-aggregation
spec:
terminationGracePeriodSeconds: 0
containers:
- name: uc2-application
image: "benediktwetzel/uc3-app:latest"
ports:
- containerPort: 5555
name: jmx
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: KAFKA_WINDOW_DURATION_MINUTES
value: "1"
- name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: prometheus-jmx-exporter
image: "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143"
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-aggregation/jmx-kafka-prometheus.yml
ports:
- containerPort: 5556
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-aggregation
volumes:
- name: jmx-config
configMap:
name: aggregation-jmx-configmap
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment