Skip to content
Snippets Groups Projects
Commit 1948862e authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Merge remote-tracking branch 'upstream/master' into 109-implement-kotlin-prototype

parents 434b753f 605510cb
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus,!78Resolve "Implement Quarkus/Kotlin protype"
Showing
with 622 additions and 101 deletions
......@@ -18,4 +18,15 @@ benchmarks:
- benchmarks/*
- when: manual
allow_failure: true
\ No newline at end of file
execution:
stage: triggers
trigger:
include: execution/.gitlab-ci.yml
strategy: depend
rules:
- if: "$CI_COMMIT_TAG"
- changes:
- execution/*
- when: manual
allow_failure: true
.dockerignore
Dockerfile
\ No newline at end of file
FROM jupyter/base-notebook
COPY . /home/jovyan
WORKDIR /home/jovyan
RUN rm -r work
RUN pip install -r requirements.txt
......@@ -3,20 +3,44 @@
This directory contains Jupyter notebooks for analyzing and visualizing
benchmark execution results and plotting. The following notebooks are provided:
* [demand-metric.ipynb](demand-metric.ipynb): Create CSV files describing scalability according to the Theodolite `demand` metric.
* [demand-metric-plot.ipynb](demand-metric-plot.ipynb): Create plots based on such CSV files of the `demand` metric.
For legacy reasons, we also provide the following notebooks, which, however, are not documented:
* [scalability-graph.ipynb](scalability-graph.ipynb): Creates a scalability graph for a certain benchmark execution.
* [scalability-graph-final.ipynb](scalability-graph-final.ipynb): Combines the scalability graphs of multiple benchmarks executions (e.g. for comparing different configuration).
* [lag-trend-graph.ipynb](lag-trend-graph.ipynb): Visualizes the consumer lag evaluation over time along with the computed trend.
## Usage
For executing benchmarks and analyzing their results, a **Python 3.7**
installation is required (e.g., in a virtual environment). Our notebooks require some
Python libraries, which can be installed via:
In general, the Theodolite Analysis Jupyter notebooks should be runnable by any Jupyter server. To make it a bit easier,
we provide introductions for running notebooks with Docker and with Visual Studio Code. These intoduction may also be
a good starting point for using another service.
For analyzing and visualizing benchmark results, either Docker or a Jupyter installation with Python 3.7 or 3.8 is
required (e.g., in a virtual environment). **Please note that Python 3.9 seems not to be working as not all our
dependencies are ported to Python 3.9 yet.**
### Running with Docker
This option requires Docker to be installed. You can build and run a container using the following commands. Make sure
to set the `results` volume to the directory with your execution results and `results-inst` to a directory where the
final scalability graphs should be placed. The output of the *run* command gives you an URL of the form
`http://127.0.0.1:8888/?token=...`, which you should open in your webbrowser. From there you can access all notebooks.
You can stop the Jupyter server with Crtl + C.
```sh
pip install -r requirements.txt
docker build . -t theodolite-analysis
docker run --rm -p 8888:8888 -v "$PWD/../results":/home/jovyan/results -v "$PWD/../results-inst":/home/jovyan/results-inst theodolite-analysis
```
We have tested these
notebooks with [Visual Studio Code](https://code.visualstudio.com/docs/python/jupyter-support),
however, every other server should be fine as well.
### Running with Visual Studio Code
The [Visual Studio Code Documentation](https://code.visualstudio.com/docs/python/jupyter-support) shows to run Jupyter
notebooks with Visual Studio Code. For our notebooks, Python 3.7 or newer is required (e.g., in a virtual environment).
Moreover, they require some Python libraries, which can be installed by:
```sh
pip install -r requirements.txt
```
\ No newline at end of file
%% Cell type:markdown id: tags:
# Theodolite Analysis - Plotting the Demand Metric
This notebook creates a plot, showing scalability as a function that maps load intensities to the resources required for processing them. It is able to combine multiple such plots in one figure, for example, to compare multiple systems or configurations.
The notebook takes a CSV file for each plot mapping load intensities to minimum required resources, computed by the `demand-metric-plot.ipynb` notebook.
%% Cell type:markdown id: tags:
First, we need to import some libraries, which are required for creating the plots.
%% Cell type:code id: tags:
``` python
import os
import pandas as pd
from functools import reduce
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.ticker import MaxNLocator
```
%% Cell type:markdown id: tags:
We need to specify the directory, where the demand CSV files can be found, and a dictionary that maps a system description (e.g. its name) to the corresponding CSV file (prefix).
%% Cell type:code id: tags:
``` python
results_dir = '<path-to>/results'
experiments = {
'System XYZ': 'exp200',
}
```
%% Cell type:markdown id: tags:
Now, we combie all systems described in `experiments`.
%% Cell type:code id: tags:
``` python
dataframes = [pd.read_csv(os.path.join(results_dir, f'{v}_demand.csv')).set_index('load').rename(columns={"resources": k}) for k, v in experiments.items()]
df = reduce(lambda df1,df2: df1.join(df2,how='outer'), dataframes)
```
%% Cell type:markdown id: tags:
We might want to display the mappings before we plot it.
%% Cell type:code id: tags:
``` python
df
```
%% Cell type:markdown id: tags:
The following code creates a MatPlotLib figure showing the scalability plots for all specified systems. You might want to adjust its styling etc. according to your preferences. Make sure to also set a filename.
%% Cell type:code id: tags:
``` python
plt.style.use('ggplot')
plt.rcParams['axes.facecolor']='w'
plt.rcParams['axes.edgecolor']='555555'
#plt.rcParams['ytick.color']='black'
plt.rcParams['grid.color']='dddddd'
plt.rcParams['axes.spines.top']='false'
plt.rcParams['axes.spines.right']='false'
plt.rcParams['legend.frameon']='true'
plt.rcParams['legend.framealpha']='1'
plt.rcParams['legend.edgecolor']='1'
plt.rcParams['legend.borderpad']='1'
@FuncFormatter
def load_formatter(x, pos):
return f'{(x/1000):.0f}k'
markers = ['s', 'D', 'o', 'v', '^', '<', '>', 'p', 'X']
def splitSerToArr(ser):
return [ser.index, ser.as_matrix()]
plt.figure()
#plt.figure(figsize=(4.8, 3.6)) # For other plot sizes
#ax = df.plot(kind='line', marker='o')
for i, column in enumerate(df):
plt.plot(df[column].dropna(), marker=markers[i], label=column)
plt.legend()
ax = plt.gca()
#ax = df.plot(kind='line',x='dim_value', legend=False, use_index=True)
ax.set_ylabel('number of instances')
ax.set_xlabel('messages/second')
ax.set_ylim(ymin=0)
#ax.set_xlim(xmin=0)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
ax.xaxis.set_major_formatter(FuncFormatter(load_formatter))
plt.savefig('temp.pdf', bbox_inches='tight')
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:markdown id: tags:
# Theodolite Analysis - Demand Metric
This notebook allows applies Theodolite's *demand* metric to describe scalability of a SUT based on Theodolite measurement data.
Theodolite's *demand* metric is a function, mapping load intensities to the minimum required resources (e.g., instances) that are required to process this load. With this notebook, the *demand* metric function is approximated by a map of tested load intensities to their minimum required resources.
The final output when running this notebook will be a CSV file, providig this mapping. It can be used to create nice plots of a system's scalability using the `demand-metric-plot.ipynb` notebook.
%% Cell type:markdown id: tags:
In the following cell, we need to specifiy:
* `exp_id`: The experiment id that is to be analyzed.
* `warmup_sec`: The number of seconds which are to be ignored in the beginning of each experiment.
* `max_lag_trend_slope`: The maximum tolerable increase in queued messages per second.
* `measurement_dir`: The directory where the measurement data files are to be found.
* `results_dir`: The directory where the computed demand CSV files are to be stored.
%% Cell type:code id: tags:
``` python
exp_id = 200
warmup_sec = 60
max_lag_trend_slope = 2000
measurement_dir = '<path-to>/measurements'
results_dir = '<path-to>/results'
```
%% Cell type:markdown id: tags:
With the following call, we compute our demand mapping.
%% Cell type:code id: tags:
``` python
from src.demand import demand
demand = demand(exp_id, measurement_dir, max_lag_trend_slope, warmup_sec)
```
%% Cell type:markdown id: tags:
We might already want to plot a simple visualization here:
%% Cell type:code id: tags:
``` python
demand.plot(kind='line',x='load',y='resources')
```
%% Cell type:markdown id: tags:
Finally we store the results in a CSV file.
%% Cell type:code id: tags:
``` python
import os
demand.to_csv(os.path.join(results_dir, f'exp{exp_id}_demand.csv'), index=False)
```
%% Cell type:code id: tags:
```
print("hello")
```
%% Cell type:code id: tags:
```
import os
from datetime import datetime, timedelta, timezone
import pandas as pd
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
```
%% Cell type:code id: tags:
```
os.getcwd()
```
%% Cell type:code id: tags:
```
exp_id = 2012
warmup_sec = 60
warmup_partitions_sec = 120
threshold = 2000 #slope
#directory = '../results'
directory = '<path-to>/results'
directory_out = '<path-to>/results-inst'
```
%% Cell type:code id: tags:outputPrepend,outputPrepend
```
#exp_id = 35
#os.chdir("./results-final")
raw_runs = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("totallag.csv")]
for filename in filenames:
#print(filename)
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
#print(linear_regressor.coef_)
row = {'dim_value': int(dim_value), 'instances': int(instances), 'trend_slope': trend_slope}
#print(row)
raw_runs.append(row)
lags = pd.DataFrame(raw_runs)
```
%% Cell type:code id: tags:
```
lags.head()
```
%% Cell type:code id: tags:
```
raw_partitions = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("partitions.csv")]
for filename in filenames:
#print(filename)
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
input = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
input = input.loc[input['topic'] >= 'input']
mean = input['value'].mean()
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
row = {'dim_value': int(dim_value), 'instances': int(instances), 'partitions': mean}
#print(row)
raw_partitions.append(row)
partitions = pd.DataFrame(raw_partitions)
#runs = lags.join(partitions.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])
```
%% Cell type:code id: tags:
```
raw_obs_instances = []
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("instances.csv")]
for filename in filenames:
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
if df.empty:
continue
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
input = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
#input = input.loc[input['topic'] >= 'input']
#mean = input['value'].mean()
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
#row = {'dim_value': int(dim_value), 'instances': int(instances), 'obs_instances': mean}
#print(row)
raw_obs_instances.append(row)
obs_instances = pd.DataFrame(raw_obs_instances)
obs_instances.head()
```
%% Cell type:code id: tags:
```
runs = lags
#runs = lags.join(partitions.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])#.join(obs_instances.set_index(['dim_value', 'instances']), on=['dim_value', 'instances'])
#runs["failed"] = runs.apply(lambda row: (abs(row['instances'] - row['obs_instances']) / row['instances']) > 0.1, axis=1)
#runs.loc[runs['failed']==True]
```
%% Cell type:code id: tags:
```
#threshold = 1000
# Set to true if the trend line has a slope less than
runs["suitable"] = runs.apply(lambda row: row['trend_slope'] < threshold, axis=1)
runs.columns = runs.columns.str.strip()
runs.sort_values(by=["dim_value", "instances"])
```
%% Cell type:code id: tags:
```
filtered = runs[runs.apply(lambda x: x['suitable'], axis=1)]
grouped = filtered.groupby(['dim_value'])['instances'].min()
min_suitable_instances = grouped.to_frame().reset_index()
min_suitable_instances
```
%% Cell type:code id: tags:
```
min_suitable_instances.to_csv(os.path.join(directory_out, f'../results-inst/exp{exp_id}_min-suitable-instances.csv'), index=False)
min_suitable_instances.to_csv(os.path.join(directory_out, f'exp{exp_id}_min-suitable-instances.csv'), index=False)
```
%% Cell type:code id: tags:
```
min_suitable_instances.plot(kind='line',x='dim_value',y='instances')
# min_suitable_instances.plot(kind='line',x='dim_value',y='instances', logy=True)
plt.show()
```
%% Cell type:code id: tags:
```
```
......
import os
from datetime import datetime, timedelta, timezone
import pandas as pd
from sklearn.linear_model import LinearRegression
def demand(exp_id, directory, threshold, warmup_sec):
raw_runs = []
# Compute SL, i.e., lag trend, for each tested configuration
filenames = [filename for filename in os.listdir(directory) if filename.startswith(f"exp{exp_id}") and filename.endswith("totallag.csv")]
for filename in filenames:
#print(filename)
run_params = filename[:-4].split("_")
dim_value = run_params[2]
instances = run_params[3]
df = pd.read_csv(os.path.join(directory, filename))
#input = df.loc[df['topic'] == "input"]
input = df
#print(input)
input['sec_start'] = input.loc[0:, 'timestamp'] - input.iloc[0]['timestamp']
#print(input)
#print(input.iloc[0, 'timestamp'])
regress = input.loc[input['sec_start'] >= warmup_sec] # Warm-Up
#regress = input
#input.plot(kind='line',x='timestamp',y='value',color='red')
#plt.show()
X = regress.iloc[:, 2].values.reshape(-1, 1) # values converts it into a numpy array
Y = regress.iloc[:, 3].values.reshape(-1, 1) # -1 means that calculate the dimension of rows, but have 1 column
linear_regressor = LinearRegression() # create object for the class
linear_regressor.fit(X, Y) # perform linear regression
Y_pred = linear_regressor.predict(X) # make predictions
trend_slope = linear_regressor.coef_[0][0]
#print(linear_regressor.coef_)
row = {'load': int(dim_value), 'resources': int(instances), 'trend_slope': trend_slope}
#print(row)
raw_runs.append(row)
runs = pd.DataFrame(raw_runs)
# Set suitable = True if SLOs are met, i.e., lag trend is below threshold
runs["suitable"] = runs.apply(lambda row: row['trend_slope'] < threshold, axis=1)
# Sort results table (unsure if required)
runs.columns = runs.columns.str.strip()
runs.sort_values(by=["load", "resources"])
# Filter only suitable configurations
filtered = runs[runs.apply(lambda x: x['suitable'], axis=1)]
# Compute demand per load intensity
grouped = filtered.groupby(['load'])['resources'].min()
demand_per_load = grouped.to_frame().reset_index()
return demand_per_load
......@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder {
*
* @return A {@code Topology} for a {@code KafkaStreams} application.
*/
protected abstract Topology buildTopology();
protected abstract Topology buildTopology(Properties properties);
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
// Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties());
final Properties properties = this.buildProperties();
return new KafkaStreams(this.buildTopology(properties), properties);
}
}
package theodolite.uc1.streamprocessing;
import com.google.gson.Gson;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
......@@ -36,7 +37,7 @@ public class TopologyBuilder {
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
public Topology build(final Properties properties) {
this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
......@@ -44,6 +45,6 @@ public class TopologyBuilder {
.mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build();
return this.builder.build(properties);
}
}
package theodolite.uc1.streamprocessing;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
......@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
}
@Override
protected Topology buildTopology() {
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build();
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties);
}
}
package theodolite.uc2.streamprocessing;
import java.time.Duration;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
......@@ -59,9 +60,9 @@ public class TopologyBuilder {
final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic;
this.outputTopic = outputTopic;
this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
......@@ -71,7 +72,7 @@ public class TopologyBuilder {
/**
* Build the {@link Topology} for the Aggregation microservice.
*/
public Topology build() {
public Topology build(final Properties properties) {
// 1. Build Parent-Sensor Table
final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
......@@ -92,7 +93,7 @@ public class TopologyBuilder {
// 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations);
return this.builder.build();
return this.builder.build(properties);
}
private KTable<String, ActivePowerRecord> buildInputTable() {
......
......@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
......@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
}
@Override
protected Topology buildTopology() {
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
......@@ -59,14 +60,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.feedbackTopic,
this.outputTopic,
this.feedbackTopic,
this.configurationTopic,
this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
return topologyBuilder.build();
return topologyBuilder.build(properties);
}
}
......@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import com.google.common.math.Stats;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
......@@ -46,7 +47,7 @@ public class TopologyBuilder {
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
public Topology build(final Properties properties) {
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
......@@ -68,6 +69,6 @@ public class TopologyBuilder {
.peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build();
return this.builder.build(properties);
}
}
......@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
......@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
}
@Override
protected Topology buildTopology() {
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build();
return topologyBuilder.build(properties);
}
}
......@@ -5,6 +5,7 @@ import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
......@@ -54,7 +55,7 @@ public class TopologyBuilder {
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
public Topology build(final Properties properties) {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
......@@ -89,6 +90,6 @@ public class TopologyBuilder {
Serdes.String()));
// this.serdes.avroValues()));
return this.builder.build();
return this.builder.build(properties);
}
}
......@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
......@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
}
@Override
protected Topology buildTopology() {
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
......@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
this.aggregtionDuration,
this.aggregationAdvance);
return topologyBuilder.build();
return topologyBuilder.build(properties);
}
}
stages:
- deploy
deploy:
stage: deploy
tags:
- exec-dind
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite ./execution
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite $DOCKERHUB_ORG/theodolite:$CI_COMMIT_TAG"
- echo $DOCKERHUB_PW | docker login -u $DOCKERHUB_ID --password-stdin
- docker push $DOCKERHUB_ORG/theodolite
- docker logout
rules:
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $CI_COMMIT_TAG"
when: always
- changes:
- execution/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW"
when: manual
allow_failure: true
deploy-ghcr:
stage: deploy
tags:
- exec-dind
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite ./execution
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite ghcr.io/$GITHUB_CR_ORG/theodolite:$CI_COMMIT_TAG"
- echo $GITHUB_CR_TOKEN | docker login ghcr.io -u $GITHUB_CR_USER --password-stdin
- docker push ghcr.io/$GITHUB_CR_ORG/theodolite
- docker logout
rules:
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $CI_COMMIT_TAG"
when: always
- changes:
- execution/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN"
when: manual
allow_failure: true
\ No newline at end of file
......@@ -8,45 +8,28 @@ benchmarks](#execution).
## Installation
### Kubernetes Cluster
For executing benchmarks, access to a Kubernetes cluster is required. If you already run other applications inside your
cluster, you might want to consider creating a dedicated namespace for your benchmarks.
For executing benchmarks, access to Kubernetes cluster is required. We suggest
to create a dedicated namespace for executing your benchmarks. The following
services need to be available as well.
### Installing Dependencies
### Kubernetes Volume
For executing the benchmark as a Kubernetes job it is required to use a volume to store the results of the executions.
In `infrastructure/kubernetes` are two files for creating a volume.
Either one of them should be used.
The `volumeSingle.yaml` is meant for systems where Kubernetes is run locally (e.g. minikube, kind etc.).
However, you can also use the other file.
In `volumeSingle.yaml` you need to set `path` to the path on your machine where the results should be stored.
The `volumeCluster.yaml` should be used when Kubernetes runs in the cloud.
In the `nodeAffinity` section you need to exchange `<node-name>` to the name of the node where the volume should be created (this node will most likely execute also the job).
However, you can also set a different `nodeAffinity`.
Further you need to set `path` to the path on the node where the results should be stored.
After setting the properties you can create the volume with:
```sh
kubectl apply -f infrastructure/kubernetes/volume(Single|Cluster).yaml
```
The following third-party services need to be installed in your cluster. For most of them, the suggested way to install
them is via [Helm](https://helm.sh).
#### Prometheus
We suggest to use the [Prometheus Operator](https://github.com/coreos/prometheus-operator)
and create a dedicated Prometheus instance for these benchmarks.
If Prometheus Operator is not already available on your cluster, a convenient
way to install is via the [**unofficial** Prometheus Operator Helm chart](https://github.com/helm/charts/tree/master/stable/prometheus-operator).
As you may not need an entire cluster monitoring stack, you can use our Helm
configuration to only install the operator:
If Prometheus Operator is not already available on your cluster, a convenient way to install it is via the
[Prometheus community Helm chart](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack).
As you may not need an entire cluster monitoring stack, you can use our Helm configuration to only install the
operator:
```sh
helm install prometheus-operator stable/prometheus-operator -f infrastructure/prometheus/helm-values.yaml
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm install prometheus-operator prometheus-community/kube-prometheus-stack -f infrastructure/prometheus/helm-values.yaml
```
After installation, you need to create a Prometheus instance:
......@@ -55,9 +38,17 @@ After installation, you need to create a Prometheus instance:
kubectl apply -f infrastructure/prometheus/prometheus.yaml
```
You might also need to apply the [ServiceAccount](infrastructure/prometheus/service-account.yaml), [ClusterRole](infrastructure/prometheus/cluster-role.yaml)
and the [CusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml),
depending on your cluster's security policies.
You might also need to apply the [ClusterRole](infrastructure/prometheus/cluster-role.yaml), the
[CusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml) and the
[ServiceAccount](infrastructure/prometheus/service-account.yaml), depending on your cluster's security
policies. If you are not in the *default* namespace, alter the namespace in
[Prometheus' ClusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml) accordingly.
```sh
kubectl apply -f infrastructure/prometheus/cluster-role.yaml
kubectl apply -f infrastructure/prometheus/cluster-role-binding.yaml
kubectl apply -f infrastructure/prometheus/service-account.yaml
```
For the individual benchmarking components to be monitored, [ServiceMonitors](https://github.com/coreos/prometheus-operator#customresourcedefinitions)
are used. See the corresponding sections below for how to install them.
......@@ -68,14 +59,16 @@ As with Prometheus, we suggest to create a dedicated Grafana instance. Grafana
with our default configuration can be installed with Helm:
```sh
helm install grafana stable/grafana -f infrastructure/grafana/values.yaml
helm repo add grafana https://grafana.github.io/helm-charts
helm repo update
helm install grafana grafana/grafana -f infrastructure/grafana/values.yaml
```
The official [Grafana Helm Chart repository](https://github.com/helm/charts/tree/master/stable/grafana)
provides further documentation including a table of configuration options.
We provide ConfigMaps for a [Grafana dashboard](infrastructure/grafana/dashboard-config-map.yaml) and a [Grafana data source](infrastructure/grafana/prometheus-datasource-config-map.yaml).
Create them as follows:
We provide ConfigMaps for a [Grafana dashboard](infrastructure/grafana/dashboard-config-map.yaml) and a
[Grafana data source](infrastructure/grafana/prometheus-datasource-config-map.yaml). Create them as follows:
```sh
kubectl apply -f infrastructure/grafana/dashboard-config-map.yaml
......@@ -102,6 +95,9 @@ kubectl apply -f infrastructure/kafka/service-monitor.yaml
Other Kafka deployments, for example, using Strimzi, should work in a similar way.
*Please note that currently, even if installed differently, the corresponding services must run at
*my-confluent-cp-kafka:9092*, *my-confluent-cp-zookeeper:2181* and *my-confluent-cp-schema-registry:8081*.
#### A Kafka Client Pod
A permanently running pod used for Kafka configuration is started via:
......@@ -128,71 +124,91 @@ To install it:
helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.3/kafka-lag-exporter-0.6.3.tgz -f infrastructure/kafka-lag-exporter/values.yaml
```
### Installing Theodolite
### Python 3.7 (Only required for local Execution Control)
While Theodolite itself has not be installed as it is loaded at runtime (see [execution](#Execution)), it requires some
resources to be deployed in your cluster. These resources are grouped under RBAC and Volume in the following paragraphs.
For executing benchmarks, a **Python 3.7** installation is required. We suggest
to use a virtual environment placed in the `.venv` directory (in the Theodolite
root directory). As set of requirements is needed. You can install them with the following
command (make sure to be in your virtual environment if you use one):
#### Theodolite RBAC
**The following step is only required if RBAC is enabled in your cluster.** If you are not sure whether this is the
case, you want to simply try it without the following step.
If RBAC is enabled in your cluster, you have to allow Theodolite to start and stop pods etc. To do so, deploy the RBAC
resources via:
```sh
pip install -r requirements.txt
kubectl apply -f infrastructure/kubernetes/rbac/role.yaml
kubectl apply -f infrastructure/kubernetes/rbac/role-binding.yaml
kubectl apply -f infrastructure/kubernetes/rbac/service-account.yaml
```
#### Theodolite Volume
### Required Manual Adjustments
In order to persistently store benchmark results, Theodolite needs a volume mounted. We provide pre-configured
declarations for different volume types.
Depending on your setup, some additional adjustments may be necessary:
##### *hostPath* volume
* Change Kafka and Zookeeper servers in the Kubernetes deployments (uc1-application etc.) and `run_XX.sh` scripts
* Change the name of your Kubernetes namespace for [Prometheus' ClusterRoleBinding](infrastructure/prometheus/cluster-role-binding.yaml)
* *Please let us know if there are further adjustments necessary*
Using a [hostPath volume](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) is the easiest option when
running Theodolite locally, e.g., with minikube or kind.
Just modify `infrastructure/kubernetes/volume-hostpath.yaml` by setting `path` to the directory on your host machine where
all benchmark results should be stored and run:
```sh
kubectl apply -f infrastructure/kubernetes/volume-hostpath.yaml
```
## Execution
##### *local* volume
You can either execute the Execution Control on your machine or also deploy the Execution control in Kubernetes.
A [local volume](https://kubernetes.io/docs/concepts/storage/volumes/#local) is a simple option to use when having
access (e.g. via SSH) to one of your cluster nodes.
### Local Execution
You first need to create a directory on a selected node where all benchmark results should be stored. Next, modify
`infrastructure/kubernetes/volume-local.yaml` by setting `<node-name>` to your selected node. (This node will most
likely also execute the [Theodolite job](#Execution).) Further, you have to set `path` to the directory on the node you just created. To deploy
you volume run:
Please note that a **Python 3.7** installation is required for executing Theodolite.
```sh
kubectl apply -f infrastructure/kubernetes/volume-local.yaml
```
The `theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows:
##### Other volumes
```python
python theodolite.py --uc <uc> --loads <load> [<load> ...] --instances <instances> [<instances> ...]
```
To use volumes provided by public cloud providers or network-based file systems, you can use the definitions in
`infrastructure/kubernetes/` as a starting point. See the offical
[volumes documentation](https://kubernetes.io/docs/concepts/storage/volumes/) for additional information.
The command above is the minimal command for execution.
Further configurations options are described [below](#configuration) or available via `python theodolite.py -h`
### Kubernetes Execution
## Execution
The Execution Control will be run by a Kubernetes Job.
This Job creates a pod that will execute the Executuion Control.
To configure the parameters, the `theodolite.yaml` need to be changed.
For the options take a look at [configuration](#configuration).
The preferred way to run scalability benchmarks with Theodolite is to deploy Theodolite
[Kubernetes Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/) in your cluster. For running
Theodolite locally on your machine see the description below.
To start the Benchmark the following command need to be executed:
```sh
kubectl apply -f theodolite.yaml
```
`theodolite.yaml` provides a template for your own Theodolite job. To run your own job, create a copy, give it a name
(`metadata.name`) and adjust configuration parameters as desired. For a description of available configuration options
see the [Configuration](#configuration) section below. Note, that you might uncomment the `serviceAccountName` line if
RBAC is enabled on your cluster (see installation of [Theodolite RBAC](#Theodolite-RBAC)).
With `kubectl logs -f theodolite-<*>` you can show the log of the execution control.
To start the execution of a benchmark run (with `<your-theodolite-yaml>` being your job definition):
When the job is finished, your results should be in your mounted [Kubernetes volume](#kubernetes-volume).
In order to start a new benchmark, the old job needs to be deleted.
This can be done with:
```sh
kubectl delete -f theodolite.yaml
kubectl create -f <your-theodolite-yaml>
```
This will create a pod with a name such as `your-job-name-xxxxxx`. You can verifiy this via `kubectl get pods`. With
`kubectl logs -f <your-job-name-xxxxxx>`, you can follow the benchmark execution logs.
Once your job is completed (you can verify via `kubectl get jobs), its results are stored inside your configured
Kubernetes volume.
**Make sure to always run only one Theodolite job at a time.**
### Configuration
| Python | Kubernetes | Description |
| Command line | Kubernetes | Description |
| -------------------- | ------------------- | ------------------------------------------------------------ |
| --uc | UC | **[Mandatory]** Stream processing use case to be benchmarked. Has to be one of `1`, `2`, `3` or `4`. |
| --loads | LOADS | **[Mandatory]** Values for the workload generator to be tested, should be sorted in ascending order. |
......@@ -208,6 +224,7 @@ kubectl delete -f theodolite.yaml
| --prometheus | PROMETHEUS_BASE_URL | Defines where to find the prometheus instance. *Default:* `http://localhost:9090` |
| --path | RESULT_PATH | A directory path for the results. Relative to the Execution folder. *Default:* `results` |
| --configurations | CONFIGURATIONS | Defines environment variables for the use cases and, thus, enables further configuration options. |
| --threshold | THRESHOLD | The threshold for the trend slop that the search strategies use to determine that a load could be handled. *Default:* `2000` |
### Domain Restriction
......@@ -219,8 +236,45 @@ For dimension value, we have a domain of the amounts of instances. As a conseque
* If the dimension value is not the smallest dimension value and N is the amount of minimal amount of instances that was suitable for the last smaller dimension value the domain for this dimension value contains all amounts of instances greater than, or equal to N.
### Benchmarking Search Strategies
There are the following benchmarking strategies:
* `check-all`: For each dimension value, execute one lag experiment for all amounts of instances within the current domain.
* `linear-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is from the lowest number of instances to the highest amount of instances and the execution for each dimension value is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful.
* `binary-search`: A heuristic which works as follows: For each dimension value, execute one lag experiment for all number of instances within the current domain. The execution order is in a binary-search-like manner. The execution is stopped, when a suitable amount of instances is found or if all lag experiments for the dimension value were not successful.
## Observation
The installed Grafana instance provides a dashboard to observe the benchmark execution. Unless configured otherwise,
this dashboard can be accessed via `http://<cluster-ip>:31199` or via `http://localhost:31199` if proxied with
`kubectl port-forward svc/grafana 8080:service`. Default credentials are user *admin* with password *admin*.
## Local Execution (e.g. for Development)
As an alternative to executing Theodolite as a Kubernetes Job, it is also possible to run it from your local system,
for example, for development purposes. In addition to the generel installation instructions, the following adjustments
are neccessary.
### Installation
For local execution a **Python 3.7** installation is required. We suggest to use a virtual environment placed in the `.venv`
directory (in the Theodolite root directory). A set of requirements is needed. You can install them with the following
command (make sure to be in your virtual environment if you use one):
```sh
pip install -r requirements.txt
```
Kubernetes volumes and service accounts, roles, and role bindings for Theodolite are not required in this case.
### Local Execution
The `theodolite.py` is the entrypoint for all benchmark executions. Is has to be called as follows:
```python
python theodolite.py --uc <uc> --loads <load> [<load> ...] --instances <instances> [<instances> ...]
```
This command is the minimal command for execution. Further configurations options are described [above](#configuration)
or available via `python theodolite.py -h`.
\ No newline at end of file
......@@ -11,7 +11,9 @@ adminPassword: admin
## Sidecars that collect the configmaps with specified label and stores the included files them into the respective folders
## Requires at least Grafana 5 to work and can't be used together with parameters dashboardProviders, datasources and dashboards
sidecar:
image: kiwigrid/k8s-sidecar:0.1.99
image:
repository: "kiwigrid/k8s-sidecar"
tag: "1.1.0"
imagePullPolicy: IfNotPresent
dashboards:
enabled: true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment