Skip to content
Snippets Groups Projects
Commit 9b2b0ad7 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into oci-volumes

parents 1d7f1a3f 9d19a3c6
No related branches found
No related tags found
No related merge requests found
Showing
with 53 additions and 527 deletions
......@@ -9,7 +9,7 @@ benchmark execution results and plotting. The following notebooks are provided:
For legacy reasons, we also provide the following notebooks, which, however, are not documented:
* [scalability-graph.ipynb](scalability-graph.ipynb): Creates a scalability graph for a certain benchmark execution.
* [scalability-graph-final.ipynb](scalability-graph-final.ipynb): Combines the scalability graphs of multiple benchmarks executions (e.g. for comparing different configuration).
* [scalability-graph-plotter.ipynb](scalability-graph-plotter.ipynb): Combines the scalability graphs of multiple benchmarks executions (e.g. for comparing different configuration).
* [lag-trend-graph.ipynb](lag-trend-graph.ipynb): Visualizes the consumer lag evaluation over time along with the computed trend.
## Usage
......
%% Cell type:markdown id: tags:
# Theodolite Analysis - Plotting the Demand Metric
This notebook creates a plot, showing scalability as a function that maps load intensities to the resources required for processing them. It is able to combine multiple such plots in one figure, for example, to compare multiple systems or configurations.
The notebook takes a CSV file for each plot mapping load intensities to minimum required resources, computed by the `demand-metric-plot.ipynb` notebook.
%% Cell type:markdown id: tags:
First, we need to import some libraries, which are required for creating the plots.
%% Cell type:code id: tags:
``` python
import os
import pandas as pd
from functools import reduce
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.ticker import MaxNLocator
```
%% Cell type:markdown id: tags:
We need to specify the directory, where the demand CSV files can be found, and a dictionary that maps a system description (e.g. its name) to the corresponding CSV file (prefix).
We need to specify the directory, where the demand CSV files can be found, and a dictionary that maps a system description (e.g. its name) to the corresponding CSV file (prefix). To use Unicode narrow non-breaking spaces in the description format it as `u"1000\u202FmCPU"`.
%% Cell type:code id: tags:
``` python
results_dir = '<path-to>/results'
experiments = {
'System XYZ': 'exp200',
}
```
%% Cell type:markdown id: tags:
Now, we combie all systems described in `experiments`.
%% Cell type:code id: tags:
``` python
dataframes = [pd.read_csv(os.path.join(results_dir, f'{v}_demand.csv')).set_index('load').rename(columns={"resources": k}) for k, v in experiments.items()]
df = reduce(lambda df1,df2: df1.join(df2,how='outer'), dataframes)
```
%% Cell type:markdown id: tags:
We might want to display the mappings before we plot it.
%% Cell type:code id: tags:
``` python
df
```
%% Cell type:markdown id: tags:
The following code creates a MatPlotLib figure showing the scalability plots for all specified systems. You might want to adjust its styling etc. according to your preferences. Make sure to also set a filename.
%% Cell type:code id: tags:
``` python
plt.style.use('ggplot')
plt.rcParams['axes.facecolor']='w'
plt.rcParams['axes.edgecolor']='555555'
#plt.rcParams['ytick.color']='black'
plt.rcParams['grid.color']='dddddd'
plt.rcParams['axes.spines.top']='false'
plt.rcParams['axes.spines.right']='false'
plt.rcParams['legend.frameon']='true'
plt.rcParams['legend.framealpha']='1'
plt.rcParams['legend.edgecolor']='1'
plt.rcParams['legend.borderpad']='1'
@FuncFormatter
def load_formatter(x, pos):
return f'{(x/1000):.0f}k'
markers = ['s', 'D', 'o', 'v', '^', '<', '>', 'p', 'X']
def splitSerToArr(ser):
return [ser.index, ser.as_matrix()]
plt.figure()
#plt.figure(figsize=(4.8, 3.6)) # For other plot sizes
#ax = df.plot(kind='line', marker='o')
for i, column in enumerate(df):
plt.plot(df[column].dropna(), marker=markers[i], label=column)
plt.legend()
ax = plt.gca()
#ax = df.plot(kind='line',x='dim_value', legend=False, use_index=True)
ax.set_ylabel('number of instances')
ax.set_xlabel('messages/second')
ax.set_ylim(ymin=0)
#ax.set_xlim(xmin=0)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
ax.xaxis.set_major_formatter(FuncFormatter(load_formatter))
plt.savefig('temp.pdf', bbox_inches='tight')
```
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:markdown id: tags:
# Theodolite Analysis - Demand Metric
This notebook allows applies Theodolite's *demand* metric to describe scalability of a SUT based on Theodolite measurement data.
This notebook applies Theodolite's *demand* metric to describe scalability of a SUT based on Theodolite measurement data.
Theodolite's *demand* metric is a function, mapping load intensities to the minimum required resources (e.g., instances) that are required to process this load. With this notebook, the *demand* metric function is approximated by a map of tested load intensities to their minimum required resources.
The final output when running this notebook will be a CSV file, providig this mapping. It can be used to create nice plots of a system's scalability using the `demand-metric-plot.ipynb` notebook.
%% Cell type:markdown id: tags:
In the following cell, we need to specifiy:
* `exp_id`: The experiment id that is to be analyzed.
* `warmup_sec`: The number of seconds which are to be ignored in the beginning of each experiment.
* `max_lag_trend_slope`: The maximum tolerable increase in queued messages per second.
* `measurement_dir`: The directory where the measurement data files are to be found.
* `results_dir`: The directory where the computed demand CSV files are to be stored.
%% Cell type:code id: tags:
``` python
exp_id = 200
warmup_sec = 60
max_lag_trend_slope = 2000
measurement_dir = '<path-to>/measurements'
results_dir = '<path-to>/results'
```
%% Cell type:markdown id: tags:
With the following call, we compute our demand mapping.
%% Cell type:code id: tags:
``` python
from src.demand import demand
demand = demand(exp_id, measurement_dir, max_lag_trend_slope, warmup_sec)
```
%% Cell type:markdown id: tags:
We might already want to plot a simple visualization here:
%% Cell type:code id: tags:
``` python
demand.plot(kind='line',x='load',y='resources')
```
%% Cell type:markdown id: tags:
Finally we store the results in a CSV file.
%% Cell type:code id: tags:
``` python
import os
demand.to_csv(os.path.join(results_dir, f'exp{exp_id}_demand.csv'), index=False)
```
......
---
title: Theodolite
nav_order: 1
permalink: /
---
# Theodolite
> A theodolite is a precision optical instrument for measuring angles between designated visible points in the horizontal and vertical planes. -- <cite>[Wikipedia](https://en.wikipedia.org/wiki/Theodolite)</cite>
Theodolite is a framework for benchmarking the horizontal and vertical scalability of stream processing engines. It consists of three modules:
## Theodolite Benchmarks
Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Kafka Streams.
## Theodolite Execution Framework
Theodolite aims to benchmark scalability of stream processing engines for real use cases. Microservices that apply stream processing techniques are usually deployed in elastic cloud environments. Hence, Theodolite's cloud-native benchmarking framework deploys as components in a cloud environment, orchestrated by Kubernetes. More information on how to execute scalability benchmarks can be found in [Thedolite execution framework](execution).
## Theodolite Analysis Tools
Theodolite's benchmarking method create a *scalability graph* allowing to draw conclusions about the scalability of a stream processing engine or its deployment. A scalability graph shows how resource demand evolves with an increasing workload. Theodolite provides Jupyter notebooks for creating such scalability graphs based on benchmarking results from the execution framework. More information can be found in [Theodolite analysis tool](analysis).
title: "Theodolite"
remote_theme: pmarsceill/just-the-docs
#color_scheme: "dark"
aux_links:
"Theodolite on GitHub":
- "//github.com/cau-se/theodolite"
\ No newline at end of file
---
title: Release Process
has_children: false
nav_order: 2
---
# Release Process
We assume that we are creating the release `v0.1.1`. Please make sure to adjust
......
......@@ -121,7 +121,9 @@ can be installed via Helm. We also provide a [default configuration](infrastruct
To install it:
```sh
helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.3/kafka-lag-exporter-0.6.3.tgz -f infrastructure/kafka-lag-exporter/values.yaml
helm repo add kafka-lag-exporter https://lightbend.github.io/kafka-lag-exporter/repo/
helm repo update
helm install kafka-lag-exporter kafka-lag-exporter/kafka-lag-exporter -f infrastructure/kafka-lag-exporter/values.yaml
```
### Installing Theodolite
......
......@@ -55,7 +55,8 @@ cp-kafka:
# "min.insync.replicas": 2
"auto.create.topics.enable": false
"log.retention.ms": "10000" # 10s
#"log.retention.ms": "86400000" # 24h
# "log.retention.ms": "86400000" # 24h
# "group.initial.rebalance.delay.ms": "30000" # 30s
"metrics.sample.window.ms": "5000" #5s
## ------------------------------------------------------
......
......@@ -36,6 +36,9 @@ nodeExporter:
prometheusOperator:
enabled: true
namespaces:
releaseNamespace: true
additional: []
prometheus:
enabled: false
......@@ -94,11 +94,11 @@ def load_yaml_files():
:return: wg, app_svc, app_svc_monitor ,app_jmx, app_deploy
"""
print('Load kubernetes yaml files')
wg = load_yaml('uc-workload-generator/base/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/base/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/base/service-monitor.yaml')
app_jmx = load_yaml('uc-application/base/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/base/aggregation-deployment.yaml')
wg = load_yaml('uc-workload-generator/workloadGenerator.yaml')
app_svc = load_yaml('uc-application/aggregation-service.yaml')
app_svc_monitor = load_yaml('uc-application/service-monitor.yaml')
app_jmx = load_yaml('uc-application/jmx-configmap.yaml')
app_deploy = load_yaml('uc-application/aggregation-deployment.yaml')
print('Kubernetes yaml files loaded')
return wg, app_svc, app_svc_monitor, app_jmx, app_deploy
......
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
cat <<EOF >uc-workload-generator/overlay/uc1-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc1-workload-generator
# Start application
REPLICAS=$INSTANCES
cat <<EOF >uc-application/overlay/uc1-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc1-application
# Execute for certain time
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc1-workload-generator
kubectl delete -k uc-application/overlay/uc1-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation"
do
echo "Wait for ZooKeeper state deletion."
sleep 5s
done
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic aggregation-feedback --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_NESTED_GROUPS=$DIM_VALUE
WL_MAX_RECORDS=150000
APPROX_NUM_SENSORS=$((4**NUM_NESTED_GROUPS))
WL_INSTANCES=$(((APPROX_NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
cat <<EOF >uc-workload-generator/overlay/uc2-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "4"
- name: HIERARCHY
value: "full"
- name: NUM_NESTED_GROUPS
value: "$NUM_NESTED_GROUPS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc2-workload-generator
# Start application
REPLICAS=$INSTANCES
cat <<EOF >uc-application/overlay/uc2-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc2-application
# Execute for certain time
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc2-workload-generator
kubectl delete -k uc-application/overlay/uc2-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|aggregation-feedback|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|aggregation-feedback|output|configuration|theodolite-.*' --if-exists"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation"
do
echo "Wait for ZooKeeper state deletion."
sleep 5s
done
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
cat <<EOF >uc-workload-generator/overlay/uc3-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc3-workload-generator
# Start application
REPLICAS=$INSTANCES
cat <<EOF >uc-application/overlay/uc3-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc3-application
kubectl scale deployment uc3-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc3-workload-generator
kubectl delete -k uc-application/overlay/uc3-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation"
do
echo "Wait for ZooKeeper state deletion."
sleep 5s
done
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
#!/bin/bash
EXP_ID=$1
DIM_VALUE=$2
INSTANCES=$3
PARTITIONS=${4:-40}
CPU_LIMIT=${5:-1000m}
MEMORY_LIMIT=${6:-4Gi}
KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100}
EXECUTION_MINUTES=${8:-5}
echo "EXP_ID: $EXP_ID"
echo "DIM_VALUE: $DIM_VALUE"
echo "INSTANCES: $INSTANCES"
echo "PARTITIONS: $PARTITIONS"
echo "CPU_LIMIT: $CPU_LIMIT"
echo "MEMORY_LIMIT: $MEMORY_LIMIT"
echo "KAFKA_STREAMS_COMMIT_INTERVAL_MS: $KAFKA_STREAMS_COMMIT_INTERVAL_MS"
echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
# Create Topics
#PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator
NUM_SENSORS=$DIM_VALUE
WL_MAX_RECORDS=150000
WL_INSTANCES=$(((NUM_SENSORS + (WL_MAX_RECORDS -1 ))/ WL_MAX_RECORDS))
cat <<EOF >uuc-workload-generator/overlay/uc4-workload-generator/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-load-generator
spec:
replicas: $WL_INSTANCES
template:
spec:
containers:
- name: workload-generator
env:
- name: NUM_SENSORS
value: "$NUM_SENSORS"
- name: INSTANCES
value: "$WL_INSTANCES"
EOF
kubectl apply -k uc-workload-generator/overlay/uc4-workload-generator
# Start application
REPLICAS=$INSTANCES
cat <<EOF >uc-application/overlay/uc4-application/set_paramters.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: titan-ccp-aggregation
spec:
replicas: $REPLICAS
template:
spec:
containers:
- name: uc-application
env:
- name: COMMIT_INTERVAL_MS
value: "$KAFKA_STREAMS_COMMIT_INTERVAL_MS"
resources:
limits:
memory: $MEMORY_LIMIT
cpu: $CPU_LIMIT
EOF
kubectl apply -k uc-application/overlay/uc4-application
kubectl scale deployment uc4-titan-ccp-aggregation --replicas=$REPLICAS
# Execute for certain time
sleep $(($EXECUTION_MINUTES * 60))
# Run eval script
source ../.venv/bin/activate
python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES
deactivate
# Stop workload generator and app
kubectl delete -k uc-workload-generator/overlay/uc4-workload-generator
kubectl delete -k uc-application/overlay/uc4-application
# Delete topics instead of Kafka
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
# kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic '.*'
#sleep 30s # TODO check
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p'
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n '/^titan-.*/p;/^input$/p;/^output$/p;/^configuration$/p' | wc -l
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*' --if-exists"
echo "Wait for topic deletion"
sleep 5s
#echo "Finished waiting, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# Sometimes a second deletion seems to be required
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
while kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 get /workload-generation"
do
echo "Wait for ZooKeeper state deletion."
sleep 5s
done
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
kubectl delete pod $KAFKA_LAG_EXPORTER_POD
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namePrefix: uc1-
images:
- name: uc-app
newName: theodolite/theodolite-uc1-kstreams-app
newTag: latest
bases:
- ../../base
patchesStrategicMerge:
- set_paramters.yaml # Patch setting the resource parameters
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment