Skip to content
Snippets Groups Projects
Commit 548f952c authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into stu203404/spesb-feature/gitlabci

parents 1a82c867 2f4bed1e
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!84Gitlab CI for Theodolite-Kotlin-Quarkus,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 284 additions and 865 deletions
workflow:
rules:
- if: $CI_MERGE_REQUEST_ID
when: never
- when: always
stages:
- triggers
- build
- test
- check
- deploy
benchmarks:
stage: triggers
trigger:
include: benchmarks/.gitlab-ci.yml
strategy: depend
.dind:
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
# Theodolite Benchmarks
.benchmarks:
image: openjdk:11-jdk
tags:
- exec-docker
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
paths:
- .gradle
before_script:
- cd benchmarks
- export GRADLE_USER_HOME=`pwd`/.gradle
build-benchmarks:
stage: build
extends: .benchmarks
script: ./gradlew --build-cache assemble
artifacts:
paths:
- "benchmarks/build/libs/*.jar"
- "benchmarks/*/build/distributions/*.tar"
expire_in: 1 day
test-benchmarks:
stage: test
extends: .benchmarks
needs:
- build-benchmarks
script: ./gradlew test --continue
artifacts:
reports:
junit:
- "benchmarks/**/build/test-results/test/TEST-*.xml"
checkstyle-benchmarks:
stage: check
extends: .benchmarks
needs:
- build-benchmarks
- test-benchmarks
script: ./gradlew checkstyle --continue
artifacts:
paths:
- "benchmarks/*/build/reports/checkstyle/main.html"
when: on_failure
expire_in: 1 day
pmd-benchmarks:
stage: check
extends: .benchmarks
needs:
- build-benchmarks
- test-benchmarks
script: ./gradlew pmd --continue
artifacts:
paths:
- "benchmarks/*/build/reports/pmd/*.html"
when: on_failure
expire_in: 1 day
spotbugs-benchmarks:
stage: check
extends: .benchmarks
needs:
- build-benchmarks
- test-benchmarks
script: ./gradlew spotbugs --continue
artifacts:
paths:
- "benchmarks/*/build/reports/spotbugs/*.html"
when: on_failure
expire_in: 1 day
.deploy-benchmarks:
stage: deploy
extends:
- .benchmarks
- .dind
needs:
- build-benchmarks
- checkstyle-benchmarks
- pmd-benchmarks
- spotbugs-benchmarks
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/$IMAGE_NAME
- docker logout
rules:
- if: "$CI_COMMIT_TAG"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/*
- when: manual
- benchmarks/$JAVA_PROJECT_NAME/**/*
- benchmarks/application-kafkastreams-commons/**/*
- benchmarks/workload-generator-commons/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
execution:
stage: triggers
trigger:
include: execution/.gitlab-ci.yml
strategy: depend
deploy-uc1-kstreams-app:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
deploy-uc2-kstreams-app:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
deploy-uc3-kstreams-app:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
deploy-uc4-kstreams-app:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
deploy-uc1-load-generator:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
deploy-uc2-load-generator:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
deploy-uc3-load-generator:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
deploy-uc4-load-generator:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
# Theodolite Framework
deploy-theodolite:
stage: deploy
extends:
- .dind
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite ./execution
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/theodolite
- docker logout
rules:
- if: "$CI_COMMIT_TAG"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG"
when: always
- changes:
- execution/*
- when: manual
- execution/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
framework:
......
cff-version: "1.1.0"
message: "If you use Theodolite, please cite it using these metadata."
authors:
-
family-names: Henning
given-names: "Sören"
orcid: "https://orcid.org/0000-0001-6912-2549"
-
family-names: Hasselbring
given-names: Wilhelm
orcid: "https://orcid.org/0000-0001-6625-4335"
title: Theodolite
version: "0.3.0"
repository-code: "https://github.com/cau-se/theodolite"
license: "Apache-2.0"
doi: "10.1016/j.bdr.2021.100209"
......@@ -17,3 +17,10 @@ Theodolite aims to benchmark scalability of stream processing engines for real u
## Theodolite Analysis Tools
Theodolite's benchmarking method creates a *scalability graph* allowing to draw conclusions about the scalability of a stream processing engine or its deployment. A scalability graph shows how resource demand evolves with an increasing workload. Theodolite provides Jupyter notebooks for creating such scalability graphs based on benchmarking results from the execution framework. More information can be found in [Theodolite analysis tool](analysis).
## How to Cite
If you use Theodolite, please cite
> Sören Henning and Wilhelm Hasselbring. (2021). Theodolite: Scalability Benchmarking of Distributed Stream Processing Engines in Microservice Architectures. Big Data Research, Volume 25. DOI: [10.1016/j.bdr.2021.100209](https://doi.org/10.1016/j.bdr.2021.100209). arXiv:[2009.00304](https://arxiv.org/abs/2009.00304).
%% Cell type:markdown id: tags:
# Theodolite Analysis - Plotting the Demand Metric
This notebook creates a plot, showing scalability as a function that maps load intensities to the resources required for processing them. It is able to combine multiple such plots in one figure, for example, to compare multiple systems or configurations.
The notebook takes a CSV file for each plot mapping load intensities to minimum required resources, computed by the `demand-metric-plot.ipynb` notebook.
%% Cell type:markdown id: tags:
First, we need to import some libraries, which are required for creating the plots.
%% Cell type:code id: tags:
``` python
import os
import pandas as pd
from functools import reduce
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.ticker import MaxNLocator
```
%% Cell type:markdown id: tags:
We need to specify the directory, where the demand CSV files can be found, and a dictionary that maps a system description (e.g. its name) to the corresponding CSV file (prefix). To use Unicode narrow non-breaking spaces in the description format it as `u"1000\u202FmCPU"`.
%% Cell type:code id: tags:
``` python
results_dir = '<path-to>/results'
experiments = {
'System XYZ': 'exp200',
}
```
%% Cell type:markdown id: tags:
Now, we combie all systems described in `experiments`.
%% Cell type:code id: tags:
``` python
dataframes = [pd.read_csv(os.path.join(results_dir, f'{v}_demand.csv')).set_index('load').rename(columns={"resources": k}) for k, v in experiments.items()]
df = reduce(lambda df1,df2: df1.join(df2,how='outer'), dataframes)
```
%% Cell type:markdown id: tags:
We might want to display the mappings before we plot it.
%% Cell type:code id: tags:
``` python
df
```
%% Cell type:markdown id: tags:
The following code creates a MatPlotLib figure showing the scalability plots for all specified systems. You might want to adjust its styling etc. according to your preferences. Make sure to also set a filename.
%% Cell type:code id: tags:
``` python
plt.style.use('ggplot')
plt.rcParams['pdf.fonttype'] = 42 # TrueType fonts
plt.rcParams['ps.fonttype'] = 42 # TrueType fonts
plt.rcParams['axes.facecolor']='w'
plt.rcParams['axes.edgecolor']='555555'
#plt.rcParams['ytick.color']='black'
plt.rcParams['grid.color']='dddddd'
plt.rcParams['axes.spines.top']='false'
plt.rcParams['axes.spines.right']='false'
plt.rcParams['legend.frameon']='true'
plt.rcParams['legend.framealpha']='1'
plt.rcParams['legend.edgecolor']='1'
plt.rcParams['legend.borderpad']='1'
@FuncFormatter
def load_formatter(x, pos):
return f'{(x/1000):.0f}k'
markers = ['s', 'D', 'o', 'v', '^', '<', '>', 'p', 'X']
def splitSerToArr(ser):
return [ser.index, ser.as_matrix()]
plt.figure()
#plt.figure(figsize=(4.8, 3.6)) # For other plot sizes
#ax = df.plot(kind='line', marker='o')
for i, column in enumerate(df):
plt.plot(df[column].dropna(), marker=markers[i], label=column)
plt.legend()
ax = plt.gca()
#ax = df.plot(kind='line',x='dim_value', legend=False, use_index=True)
ax.set_ylabel('number of instances')
ax.set_xlabel('messages/second')
ax.set_ylim(ymin=0)
#ax.set_xlim(xmin=0)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
ax.xaxis.set_major_formatter(FuncFormatter(load_formatter))
plt.savefig('temp.pdf', bbox_inches='tight')
```
%% Cell type:code id: tags:
``` python
```
......
image: openjdk:11-jdk
# Disable the Gradle daemon for Continuous Integration servers as correctness
# is usually a priority over speed in CI environments. Using a fresh
# runtime for each build is more reliable since the runtime is completely
# isolated from any previous builds.
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
paths:
- .gradle
before_script:
- cd benchmarks
- export GRADLE_USER_HOME=`pwd`/.gradle
stages:
- build
- test
- check
- deploy
build:
stage: build
tags:
- exec-docker
script: ./gradlew --build-cache assemble
artifacts:
paths:
- "benchmarks/build/libs/*.jar"
- "benchmarks/*/build/distributions/*.tar"
expire_in: 1 day
test:
stage: test
tags:
- exec-docker
script: ./gradlew test --continue
artifacts:
reports:
junit:
- "benchmarks/**/build/test-results/test/TEST-*.xml"
checkstyle:
stage: check
tags:
- exec-docker
script: ./gradlew checkstyle --continue
artifacts:
paths:
- "benchmarks/*/build/reports/checkstyle/main.html"
when: on_failure
expire_in: 1 day
pmd:
stage: check
tags:
- exec-docker
script: ./gradlew pmd --continue
artifacts:
paths:
- "benchmarks/*/build/reports/pmd/*.html"
when: on_failure
expire_in: 1 day
spotbugs:
stage: check
tags:
- exec-docker
script: ./gradlew spotbugs --continue
artifacts:
paths:
- "benchmarks/*/build/reports/spotbugs/*.html"
when: on_failure
expire_in: 1 day
.deploy:
stage: deploy
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $DOCKERHUB_PW | docker login -u $DOCKERHUB_ID --password-stdin
- docker push $DOCKERHUB_ORG/$IMAGE_NAME
- docker logout
rules:
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
# - $JAVA_PROJECT_NAME/**/* # hope this can be simplified soon, see #51
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc2-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc2-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
.deploy-ghcr:
stage: deploy
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $GITHUB_CR_TOKEN | docker login ghcr.io -u $GITHUB_CR_USER --password-stdin
- docker push ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME
- docker logout
rules:
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
# - $JAVA_PROJECT_NAME/**/* # hope this can be simplified soon, see #51
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc1-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc2-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc3-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc4-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc1-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc2-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc3-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc4-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
......@@ -67,7 +67,6 @@ configure(useCaseApplications) {
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation project(':application-kafkastreams-commons')
......@@ -82,8 +81,6 @@ configure(useCaseGenerators) {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
// These dependencies are used for the workload-generator-commmon
......
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
......
package theodolite.uc1.workloadgenerator;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Load Generator for UC1.
* Load Generator for Theodolite use case UC1.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final long MAX_DURATION_IN_DAYS = 30L;
private LoadGenerator() {}
/**
* Entry point.
* Start load generator for use case UC1.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
"4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender.Builder<ActivePowerRecord>(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
}
}
mainClassName = "theodolite.uc2.application.AggregationService"
mainClassName = "theodolite.uc2.application.HistoryService"
package theodolite.uc4.application;
package theodolite.uc2.application;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
......@@ -18,6 +19,8 @@ public class HistoryService {
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/**
* Start the service.
......@@ -31,17 +34,12 @@ public class HistoryService {
*
*/
private void createKafkaStreamsApplication() {
// Use case specific stream configuration
final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config);
uc4KafkaStreamsBuilder
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
uc2KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build();
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
......
package theodolite.uc2.streamprocessing;
import com.google.common.math.Stats;
import java.time.Duration;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.uc2.streamprocessing.util.StatsFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
// Streams Variables
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String feedbackTopic;
private final String outputTopic;
private final String configurationTopic;
private final Duration emitPeriod;
private final Duration gracePeriod;
// Serdes
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration duration;
private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*
* @param inputTopic The topic where to read sensor measurements from.
* @param configurationTopic The topic where the hierarchy of the sensors is published.
* @param feedbackTopic The topic where aggregation results are written to for feedback.
* @param outputTopic The topic where to publish aggregation results.
* @param emitPeriod The Duration results are emitted with.
* @param gracePeriod The Duration for how long late arriving records are considered.
* @param srAvroSerdeFactory Factory for creating avro SERDEs
*
*/
public TopologyBuilder(final String inputTopic, final String outputTopic,
final String feedbackTopic, final String configurationTopic,
final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration duration) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic;
this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.duration = duration;
}
/**
* Build the {@link Topology} for the Aggregation microservice.
* Build the {@link Topology} for the History microservice.
*/
public Topology build(final Properties properties) {
// 1. Build Parent-Sensor Table
final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
// 2. Build Input Table
final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable();
// 3. Build Last Value Table from Input and Parent-Sensor Table
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable =
this.buildLastValueTable(parentSensorTable, inputTable);
// 4. Build Aggregations Stream
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations =
this.buildAggregationStream(lastValueTable);
// 6. Expose Feedback Stream
this.exposeFeedbackStream(aggregations);
// 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations);
return this.builder.build(properties);
}
private KTable<String, ActivePowerRecord> buildInputTable() {
final KStream<String, ActivePowerRecord> values = this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
.stream(this.feedbackTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
.mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
final KTable<String, ActivePowerRecord> inputTable = values
.merge(aggregationsInput)
.groupByKey(Grouped.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()))
.reduce((aggr, value) -> value, Materialized.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
return inputTable;
}
private KTable<String, Set<String>> buildParentSensorTable() {
final KStream<Event, String> configurationStream = this.builder
.stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String()))
.filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED
|| key == Event.SENSOR_REGISTRY_STATUS);
return configurationStream
.mapValues(data -> SensorRegistry.fromJson(data))
.flatTransform(new ChildParentsTransformerSupplier())
.groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde()))
.aggregate(
() -> Set.<String>of(),
(key, newValue, oldValue) -> newValue.orElse(null),
Materialized.with(Serdes.String(), ParentsSerde.serde()));
}
private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable(
final KTable<String, Set<String>> parentSensorTable,
final KTable<String, ActivePowerRecord> inputTable) {
return inputTable
.join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record))
.toStream()
.flatTransform(new JointFlatTransformerSupplier())
.groupByKey(Grouped.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
.reduce(
// TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
Materialized.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()));
}
private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream(
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) {
return lastValueTable
.groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v),
Grouped.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey()
.windowedBy(TimeWindows.of(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> null,
this.recordAggregator::add,
this.recordAggregator::substract,
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
Materialized.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
// TODO timestamp -1 indicates that this record is emitted by an substract event
.filter((k, record) -> record.getTimestamp() != -1);
}
private void exposeFeedbackStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.feedbackTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
}
private void exposeOutputStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
.map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
aggregations
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded()))
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.outputTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
return this.builder.build(properties);
}
}
......@@ -11,62 +11,33 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder {
private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
private String feedbackTopic; // NOPMD
private String outputTopic; // NOPMD
private String configurationTopic; // NOPMD
private Duration emitPeriod; // NOPMD
private Duration gracePeriod; // NOPMD
private Duration windowDuration; // NOPMD
public Uc2KafkaStreamsBuilder(final Configuration config) {
super(config);
}
public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) {
this.feedbackTopic = feedbackTopic;
return this;
}
public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) {
this.configurationTopic = configurationTopic;
return this;
}
public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) {
this.emitPeriod = Objects.requireNonNull(emitPeriod);
return this;
}
public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) {
this.gracePeriod = Objects.requireNonNull(gracePeriod);
public Uc2KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
this.windowDuration = windowDuration;
return this;
}
@Override
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.feedbackTopic,
this.configurationTopic,
this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build(properties);
}
......
package theodolite.uc4.streamprocessing.util;
package theodolite.uc2.streamprocessing.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
......
......@@ -3,11 +3,7 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.configuration.topic=configuration
kafka.feedback.topic=aggregation-feedback
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8091
emit.period.ms=5000
grace.period.ms=0
\ No newline at end of file
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......
package theodolite.uc2.workloadgenerator;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* The {@code LoadGenerator} creates a load in Kafka.
* Load generator for Theodolite use case UC2.
*/
public final class LoadGenerator {
private static final int SLEEP_PERIOD = 30_000;
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// Constants
private static final String DEEP = "deep";
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC2.");
// get environment variables
final String hierarchy = System.getenv("HIERARCHY");
if (hierarchy != null && hierarchy.equals(DEEP)) {
LOGGER.error(
"The HIERARCHY parameter is no longer supported. Creating a full hierachy instead.");
}
final int numNestedGroups = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
// build sensor registry
final SensorRegistry sensorRegistry =
new SensorRegistryBuilder(numNestedGroups, numSensors).build();
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
new KafkaRecordSender.Builder<ActivePowerRecord>(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.beforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
LOGGER.info("Configuration sent.");
private LoadGenerator() {}
LOGGER.info("Now wait 30 seconds");
try {
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
LOGGER.error(e.getMessage(), e);
public static void main(final String[] args) {
LOGGER.info("Start workload generator for use case UC2");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
}
LOGGER.info("And woke up again :)");
}
})
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment