Skip to content
Snippets Groups Projects
Commit ab72f42f authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Merge branch 'master' into 87-random-scheduling

parents 57019b46 b69c786c
Branches
Tags
1 merge request!61Allow using a random scheduler
Showing
with 559 additions and 405 deletions
image: openjdk:11-jdk
# Disable the Gradle daemon for Continuous Integration servers as correctness
# is usually a priority over speed in CI environments. Using a fresh
# runtime for each build is more reliable since the runtime is completely
# isolated from any previous builds.
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
paths:
- .gradle
before_script:
- export GRADLE_USER_HOME=`pwd`/.gradle
workflow:
rules:
- if: $CI_MERGE_REQUEST_ID
when: never
- when: always
stages:
- build
- test
- check
- deploy
build:
stage: build
tags:
- exec-docker
script: ./gradlew --build-cache assemble
artifacts:
paths:
- "build/libs/*.jar"
- "*/build/distributions/*.tar"
expire_in: 1 day
test:
stage: test
tags:
- exec-docker
script: ./gradlew test --continue
artifacts:
reports:
junit:
- "**/build/test-results/test/TEST-*.xml"
checkstyle:
stage: check
tags:
- exec-docker
script: ./gradlew checkstyle --continue
artifacts:
paths:
- "*/build/reports/checkstyle/main.html"
when: on_failure
expire_in: 1 day
pmd:
stage: check
tags:
- exec-docker
script: ./gradlew pmd --continue
artifacts:
paths:
- "*/build/reports/pmd/*.html"
when: on_failure
expire_in: 1 day
spotbugs:
stage: check
tags:
- exec-docker
script: ./gradlew spotbugs --continue
artifacts:
paths:
- "*/build/reports/spotbugs/*.html"
when: on_failure
expire_in: 1 day
- triggers
.deploy:
stage: deploy
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $DOCKERHUB_PW | docker login -u $DOCKERHUB_ID --password-stdin
- docker push $DOCKERHUB_ORG/$IMAGE_NAME
- docker logout
benchmarks:
stage: triggers
trigger:
include: benchmarks/.gitlab-ci.yml
strategy: depend
rules:
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
# - $JAVA_PROJECT_NAME/**/* # hope this can be simplified soon, see #51
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- if: "$CI_COMMIT_TAG"
- changes:
- uc1-application/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
- benchmarks/*
- when: manual
allow_failure: true
\ No newline at end of file
deploy-uc2-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc2-application/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc3-application/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc4-application/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc1-workload-generator/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc2-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc2-workload-generator/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc3-workload-generator/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- uc4-workload-generator/**/*
- application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
......@@ -11,9 +11,9 @@ Theodolite contains 4 application benchmarks, which are based on typical use cas
## Theodolite Execution Framework
Theodolite aims to benchmark scalability of stream processing engines for real use cases. Microservices that apply stream processing techniques are usually deployed in elastic cloud environments. Hence, Theodolite's cloud-native benchmarking framework deploys as components in a cloud environment, orchestrated by Kubernetes. More information on how to execute scalability benchmarks can be found in [Thedolite execution framework](execution).
Theodolite aims to benchmark scalability of stream processing engines for real use cases. Microservices that apply stream processing techniques are usually deployed in elastic cloud environments. Hence, Theodolite's cloud-native benchmarking framework deploys its components in a cloud environment, orchestrated by Kubernetes. More information on how to execute scalability benchmarks can be found in [Thedolite execution framework](execution).
## Theodolite Analysis Tools
Theodolite's benchmarking method create a *scalability graph* allowing to draw conclusions about the scalability of a stream processing engine or its deployment. A scalability graph shows how resource demand evolves with an increasing workload. Theodolite provides Jupyter notebooks for creating such scalability graphs based on benchmarking results from the execution framework. More information can be found in [Theodolite analysis tool](analysis).
Theodolite's benchmarking method creates a *scalability graph* allowing to draw conclusions about the scalability of a stream processing engine or its deployment. A scalability graph shows how resource demand evolves with an increasing workload. Theodolite provides Jupyter notebooks for creating such scalability graphs based on benchmarking results from the execution framework. More information can be found in [Theodolite analysis tool](analysis).
package theodolite.commons.kafkastreams;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public abstract class KafkaStreamsBuilder {
// Kafkastreams application specific
protected String schemaRegistryUrl; // NOPMD for use in subclass
private String applicationName; // NOPMD
private String applicationVersion; // NOPMD
private String bootstrapServers; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
/**
* Sets the application name for the {@code KafkaStreams} application. It is used to create the
* application ID.
*
* @param applicationName Name of the application.
* @return
*/
public KafkaStreamsBuilder applicationName(final String applicationName) {
this.applicationName = applicationName;
return this;
}
/**
* Sets the application version for the {@code KafkaStreams} application. It is used to create the
* application ID.
*
* @param applicationVersion Version of the application.
* @return
*/
public KafkaStreamsBuilder applicationVersion(final String applicationVersion) {
this.applicationVersion = applicationVersion;
return this;
}
/**
* Sets the bootstrap servers for the {@code KafkaStreams} application.
*
* @param bootstrapServers String for a bootstrap server.
* @return
*/
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
/**
* Sets the URL for the schema registry.
*
* @param url The URL of the schema registry.
* @return
*/
public KafkaStreamsBuilder schemaRegistry(final String url) {
this.schemaRegistryUrl = url;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*
* @param numThreads Number of threads. -1 for using the default.
* @return
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*
* @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using
* the default.
* @return
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*
* @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all
* threads. -1 for using the default.
* @return
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Method to implement a {@link Topology} for a {@code KafkaStreams} application.
*
* @return A {@code Topology} for a {@code KafkaStreams} application.
*/
protected abstract Topology buildTopology();
/**
* Build the {@link Properties} for a {@code KafkaStreams} application.
*
* @return A {@code Properties} object.
*/
protected Properties buildProperties() {
return PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(this.applicationName + '-' + this.applicationVersion)
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.build();
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
// Check for required attributes for building properties.
Objects.requireNonNull(this.applicationName, "Application name has not been set.");
Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set.");
// Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties());
}
}
image: openjdk:11-jdk
# Disable the Gradle daemon for Continuous Integration servers as correctness
# is usually a priority over speed in CI environments. Using a fresh
# runtime for each build is more reliable since the runtime is completely
# isolated from any previous builds.
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
paths:
- .gradle
before_script:
- cd benchmarks
- export GRADLE_USER_HOME=`pwd`/.gradle
stages:
- build
- test
- check
- deploy
build:
stage: build
tags:
- exec-docker
script: ./gradlew --build-cache assemble
artifacts:
paths:
- "benchmarks/build/libs/*.jar"
- "benchmarks/*/build/distributions/*.tar"
expire_in: 1 day
test:
stage: test
tags:
- exec-docker
script: ./gradlew test --continue
artifacts:
reports:
junit:
- "benchmarks/**/build/test-results/test/TEST-*.xml"
checkstyle:
stage: check
tags:
- exec-docker
script: ./gradlew checkstyle --continue
artifacts:
paths:
- "benchmarks/*/build/reports/checkstyle/main.html"
when: on_failure
expire_in: 1 day
pmd:
stage: check
tags:
- exec-docker
script: ./gradlew pmd --continue
artifacts:
paths:
- "benchmarks/*/build/reports/pmd/*.html"
when: on_failure
expire_in: 1 day
spotbugs:
stage: check
tags:
- exec-docker
script: ./gradlew spotbugs --continue
artifacts:
paths:
- "benchmarks/*/build/reports/spotbugs/*.html"
when: on_failure
expire_in: 1 day
.deploy:
stage: deploy
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $DOCKERHUB_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $DOCKERHUB_PW | docker login -u $DOCKERHUB_ID --password-stdin
- docker push $DOCKERHUB_ORG/$IMAGE_NAME
- docker logout
rules:
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
# - $JAVA_PROJECT_NAME/**/* # hope this can be simplified soon, see #51
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc2-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-kstreams-app:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc1-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc2-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc3-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-uc4-workload-generator:
extends: .deploy
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$DOCKERHUB_ORG && $DOCKERHUB_ID && $DOCKERHUB_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
.deploy-ghcr:
stage: deploy
tags:
- exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1
services:
- docker:19.03.1-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $GITHUB_CR_TOKEN | docker login ghcr.io -u $GITHUB_CR_USER --password-stdin
- docker push ghcr.io/$GITHUB_CR_ORG/$IMAGE_NAME
- docker logout
rules:
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
# - $JAVA_PROJECT_NAME/**/* # hope this can be simplified soon, see #51
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc1-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc2-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc3-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc4-kstreams-app:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-application"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-application/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc1-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc1-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc2-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc2-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc3-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc3-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
deploy-ghcr-uc4-workload-generator:
extends: .deploy-ghcr
variables:
IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-workload-generator"
rules: # hope this can be simplified soon, see #51
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- benchmarks/uc4-workload-generator/**/*
- benchmarks/application-kafkastreams-commons/**/*
if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$GITHUB_CR_ORG && $GITHUB_CR_USER && $GITHUB_CR_TOKEN && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
......@@ -9,12 +9,6 @@ public final class ConfigurationKeys {
public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
......
package theodolite.commons.kafkastreams;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public abstract class KafkaStreamsBuilder {
// Kafka Streams application specific
protected final String schemaRegistryUrl; // NOPMD for use in subclass
protected final String inputTopic; // NOPMD for use in subclass
private final Configuration config;
private final String applicationName; // NOPMD
private final String applicationVersion; // NOPMD
private final String bootstrapServers; // NOPMD
/**
* Construct a new Build object for a Kafka Streams application.
*
* @param config Contains the key value pairs for configuration.
*/
public KafkaStreamsBuilder(final Configuration config) {
this.config = config;
this.applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
this.applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.bootstrapServers = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
this.inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
}
/**
* Checks if the given key is contained in the configurations and sets it in the properties.
*
* @param <T> Type of the value for given key
* @param propBuilder Object where to set this property.
* @param key The key to check and set the property.
* @param valueGetter Method to get the value from with given key.
* @param condition for setting the property.
*/
private <T> void setOptionalProperty(final PropertiesBuilder propBuilder,
final String key,
final Function<String, T> valueGetter,
final Predicate<T> condition) {
if (this.config.containsKey(key)) {
final T value = valueGetter.apply(key);
propBuilder.set(key, value, condition);
}
}
/**
* Build the {@link Properties} for a {@code KafkaStreams} application.
*
* @return A {@code Properties} object.
*/
protected Properties buildProperties() {
// required configuration
final PropertiesBuilder propBuilder = PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(this.applicationName + '-' + this.applicationVersion);
// optional configurations
this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
this.config::getLong,
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG,
this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
this.config::getInt,
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
this.config::getInt, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
this.config::getLong,
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
this.config::getInt, p -> p >= 1);
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
this.config::getInt, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG,
this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG,
this.config::getLong,
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p)
|| StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p));
this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG,
this.config::getInt, p -> p >= 0);
if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION)
&& this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) {
propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
}
return propBuilder.build();
}
/**
* Method to implement a {@link Topology} for a {@code KafkaStreams} application.
*
* @return A {@code Topology} for a {@code KafkaStreams} application.
*/
protected abstract Topology buildTopology();
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
// Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties());
}
}
......@@ -6,7 +6,7 @@ buildscript {
}
}
dependencies {
classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.3"
classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.0"
}
}
......@@ -65,6 +65,7 @@ configure(useCaseApplications) {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
......@@ -100,6 +101,7 @@ configure(commonProjects) {
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......@@ -108,7 +110,7 @@ configure(commonProjects) {
// Per default XML reports for SpotBugs are generated
// Include this to generate HTML reports
tasks.withType(com.github.spotbugs.SpotBugsTask) {
tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {
reports {
// Either HTML or XML reports can be activated
html.enabled true
......@@ -165,7 +167,7 @@ subprojects {
reportLevel = "low"
effort = "max"
ignoreFailures = false
toolVersion = '3.1.7'
toolVersion = '4.1.4'
}
}
......
File moved
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment