Skip to content
Snippets Groups Projects
Commit 8c913c0d authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite into...

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite into feature/281-Add-Beam-Kubernetes-Benchmark-Definitions
parents db456730 2d87de45
Branches
Tags
1 merge request!218Add Beam Kubernetes Benchmark Definitions
Pipeline #6485 passed
Showing
with 493 additions and 146 deletions
workflow: workflow:
rules: rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"' - if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
- if: '$CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS && $CI_PIPELINE_SOURCE == "push"' - if: '$CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS && $CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_REF_PROTECTED != "true"'
when: never when: never
- when: always - when: always
...@@ -10,6 +10,7 @@ stages: ...@@ -10,6 +10,7 @@ stages:
- test - test
- check - check
- deploy - deploy
- smoketest
default: default:
tags: tags:
...@@ -20,9 +21,9 @@ default: ...@@ -20,9 +21,9 @@ default:
- exec-dind - exec-dind
# see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled # see https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
# for image usage and settings for building with TLS and docker in docker # for image usage and settings for building with TLS and docker in docker
image: docker:19.03.1 image: docker:20.10.12
services: services:
- docker:19.03.1-dind - docker:20.10.12-dind
variables: variables:
DOCKER_TLS_CERTDIR: "/certs" DOCKER_TLS_CERTDIR: "/certs"
...@@ -33,12 +34,22 @@ default: ...@@ -33,12 +34,22 @@ default:
script: script:
- mkdir -p /kaniko/.docker - mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json - echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - >
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\"" if [ $IMAGE_TAG ]; then
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\"" KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$IMAGE_TAG"
- "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\"" elif [ $CI_COMMIT_TAG ]; then
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
else
DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
KANIKO_D="$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
fi
- "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\"" - "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\""
- /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D - /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D
- echo "PUBLISHED_IMAGE_TAG=${CI_COMMIT_TAG-$CI_COMMIT_SHORT_SHA}" >> build.env
artifacts:
reports:
dotenv: build.env
# Theodolite Docs # Theodolite Docs
...@@ -73,26 +84,37 @@ test-docs-links: ...@@ -73,26 +84,37 @@ test-docs-links:
- build-docs - build-docs
script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site
build-docs-crds:
stage: build
image:
name: ghcr.io/fybrik/crdoc:0.6.1
entrypoint: [""]
script: /crdoc --resources theodolite/crd/ --template docs/api-reference/crds.tmpl --output docs/api-reference/crds.ref.md
artifacts:
paths:
- docs/api-reference/crds.ref.md
expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
test-docs-crds-regression: test-docs-crds-regression:
stage: test stage: test
image: golang needs:
- build-docs-crds
image: alpine:3.15
before_script: before_script:
- cd docs - cd docs
- go install fybrik.io/crdoc@latest
script: script:
- crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --output api-reference/crds.ref.md
- cmp api-reference/crds.md api-reference/crds.ref.md - cmp api-reference/crds.md api-reference/crds.ref.md
artifacts: artifacts:
when: on_failure when: on_failure
paths: paths:
- docs/api-reference/crds.ref.md - docs/api-reference/crds.ref.md
expire_in: 1 week expire_in: 1 week
rules:
- changes:
- docs/api-reference/crds.tmpl
- theodolite/crd/**/*
- when: manual
allow_failure: true
# Theodolite Helm Chart # Theodolite Helm Chart
...@@ -104,6 +126,11 @@ lint-helm: ...@@ -104,6 +126,11 @@ lint-helm:
name: alpine/helm:3.5.2 name: alpine/helm:3.5.2
entrypoint: [""] entrypoint: [""]
script: helm lint helm/ script: helm lint helm/
rules:
- changes:
- helm/*
- when: manual
allow_failure: true
# Theodolite Benchmarks # Theodolite Benchmarks
...@@ -351,6 +378,166 @@ deploy-uc4-load-generator: ...@@ -351,6 +378,166 @@ deploy-uc4-load-generator:
JAVA_PROJECT_NAME: "uc4-load-generator" JAVA_PROJECT_NAME: "uc4-load-generator"
JAVA_PROJECT_DEPS: "load-generator-commons" JAVA_PROJECT_DEPS: "load-generator-commons"
.smoketest-benchmarks:
stage: smoketest
extends:
- .dind
image: ghcr.io/cau-se/theodolite-build-docker-compose-jq:20.10.12
before_script:
- cd theodolite-benchmarks/docker-test
# variables:
# TEST_LOG_FILE: "test.log"
script:
- export THEODOLITE_TAG=$PUBLISHED_IMAGE_TAG
- ./smoketest-runner.sh ./$DOCKER_COMPOSE_DIR
# - cat test.log
after_script:
- cd ./$DOCKER_COMPOSE_DIR
- docker-compose down
rules:
- changes:
- theodolite-benchmarks/*
- theodolite-benchmarks/{$JAVA_PROJECT_DEPS}/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $DOCKER_COMPOSE_DIR && $JAVA_PROJECT_DEPS"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $DOCKER_COMPOSE_DIR && $JAVA_PROJECT_DEPS"
when: manual
allow_failure: true
smoketest-uc1-kstreams:
extends: .smoketest-benchmarks
needs:
- deploy-uc1-kstreams
- deploy-uc1-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc1-kstreams"
JAVA_PROJECT_DEPS: "uc1-kstreams,kstreams-commons,uc1-load-generator,load-generator-commons"
smoketest-uc1-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc1-flink
- deploy-uc1-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc1-flink"
JAVA_PROJECT_DEPS: "uc1-flink,flink-commons,uc1-load-generator,load-generator-commons"
smoketest-uc1-beam-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc1-beam-flink
- deploy-uc1-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc1-beam-flink"
JAVA_PROJECT_DEPS: "uc1-beam-flink,uc1-beam,beam-commons,uc1-load-generator,load-generator-commons"
smoketest-uc1-beam-samza:
extends: .smoketest-benchmarks
needs:
- deploy-uc1-beam-samza
- deploy-uc1-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc1-beam-samza"
JAVA_PROJECT_DEPS: "uc1-beam-samza,uc1-beam,beam-commons,uc1-load-generator,load-generator-commons"
smoketest-uc2-kstreams:
extends: .smoketest-benchmarks
needs:
- deploy-uc2-kstreams
- deploy-uc2-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc2-kstreams"
JAVA_PROJECT_DEPS: "uc2-kstreams,kstreams-commons,uc2-load-generator,load-generator-commons"
smoketest-uc2-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc2-flink
- deploy-uc2-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc2-flink"
JAVA_PROJECT_DEPS: "uc2-flink,flink-commons,uc2-load-generator,load-generator-commons"
smoketest-uc2-beam-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc2-beam-flink
- deploy-uc2-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc2-beam-flink"
JAVA_PROJECT_DEPS: "uc2-beam-flink,uc2-beam,beam-commons,uc2-load-generator,load-generator-commons"
smoketest-uc2-beam-samza:
extends: .smoketest-benchmarks
needs:
- deploy-uc2-beam-samza
- deploy-uc2-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc2-beam-samza"
JAVA_PROJECT_DEPS: "uc2-beam-samza,uc2-beam,beam-commons,uc2-load-generator,load-generator-commons"
smoketest-uc3-kstreams:
extends: .smoketest-benchmarks
needs:
- deploy-uc3-kstreams
- deploy-uc3-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc3-kstreams"
JAVA_PROJECT_DEPS: "uc3-kstreams,kstreams-commons,uc3-load-generator,load-generator-commons"
smoketest-uc3-beam-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc3-beam-flink
- deploy-uc3-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc3-beam-flink"
JAVA_PROJECT_DEPS: "uc3-beam-flink,uc3-beam,beam-commons,uc3-load-generator,load-generator-commons"
smoketest-uc3-beam-samza:
extends: .smoketest-benchmarks
needs:
- deploy-uc3-beam-samza
- deploy-uc3-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc3-beam-samza"
JAVA_PROJECT_DEPS: "uc3-beam-samza,uc3-beam,beam-commons,uc3-load-generator,load-generator-commons"
smoketest-uc4-kstreams:
extends: .smoketest-benchmarks
needs:
- deploy-uc4-kstreams
- deploy-uc4-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc4-kstreams"
JAVA_PROJECT_DEPS: "uc4-kstreams,kstreams-commons,uc4-load-generator,load-generator-commons"
smoketest-uc4-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc4-flink
- deploy-uc4-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc4-flink"
JAVA_PROJECT_DEPS: "uc4-flink,flink-commons,uc4-load-generator,load-generator-commons"
smoketest-uc4-beam-flink:
extends: .smoketest-benchmarks
needs:
- deploy-uc4-beam-flink
- deploy-uc4-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc4-beam-flink"
JAVA_PROJECT_DEPS: "uc4-beam-flink,uc4-beam,beam-commons,uc4-load-generator,load-generator-commons"
smoketest-uc4-beam-samza:
extends: .smoketest-benchmarks
needs:
- deploy-uc4-beam-samza
- deploy-uc4-load-generator
variables:
DOCKER_COMPOSE_DIR: "uc4-beam-samza"
JAVA_PROJECT_DEPS: "uc4-beam-samza,uc4-beam,beam-commons,uc4-load-generator,load-generator-commons"
# Theodolite Framework # Theodolite Framework
...@@ -367,6 +554,11 @@ deploy-uc4-load-generator: ...@@ -367,6 +554,11 @@ deploy-uc4-load-generator:
before_script: before_script:
- export GRADLE_USER_HOME=`pwd`/.gradle - export GRADLE_USER_HOME=`pwd`/.gradle
- cd theodolite - cd theodolite
rules:
- changes:
- theodolite/**/*
- when: manual
allow_failure: true
build-theodolite-jvm: build-theodolite-jvm:
stage: build stage: build
...@@ -568,3 +760,21 @@ deploy-random-scheduler: ...@@ -568,3 +760,21 @@ deploy-random-scheduler:
when: manual when: manual
allow_failure: true allow_failure: true
deploy-buildimage-docker-compose-jq:
stage: deploy
extends:
- .kaniko-push
needs: []
variables:
DOCKER_VERSION: 20.10.12
IMAGE_NAME: theodolite-build-docker-compose-jq
IMAGE_TAG: $DOCKER_VERSION
before_script:
- cd buildimages/docker-compose-jq
rules:
- changes:
- buildimages/docker-compose-jq/Dockerfile
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_PIPELINE_SOURCE == 'web'"
when: manual
allow_failure: true
FROM docker:${DOCKER_VERSION:-latest}
RUN apk update && \
apk add jq && \
apk add py-pip python3-dev libffi-dev openssl-dev gcc libc-dev rust cargo make && \
pip install docker-compose
...@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu ...@@ -39,5 +39,5 @@ crdoc --resources ../theodolite/crd/ --template api-reference/crds.tmpl --outpu
With the following command, crdoc is executed in Docker: With the following command, crdoc is executed in Docker:
```sh ```sh
docker run --rm -v "`pwd`/../theodolite/crd/":/crd -u $UID -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.0 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md docker run --rm -v "`pwd`/../theodolite/crd/":/crd -v "`pwd`/api-reference":/api-reference ghcr.io/fybrik/crdoc:0.6.1 --resources /crd/ --template /api-reference/crds.tmpl --output /api-reference/crds.md
``` ```
...@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers ...@@ -115,15 +115,13 @@ If a benchmark is [executed by an Execution](running-benchmarks), these patchers
## Kafka Configuration ## Kafka Configuration
Theodolite allows to automatically create and remove Kafka topics for each SLO experiment. Theodolite allows to automatically create and remove Kafka topics for each SLO experiment by setting a `kafkaConfig`.
Use the `removeOnly: True` property for topics which are created automatically by the SUT. It `bootstrapServer` needs to point your Kafka cluster and `topics` configures the list of Kafka topics to be created/removed.
For those topics, also wildcards are allowed in the topic name. For each topic, you configure its name, the number of partitions and the replication factor.
If no Kafka topics should be created, simply set: With the `removeOnly: True` property, you can also instruct Theodolite to only remove topics and not create them.
This is useful when benchmarking SUTs, which create topics on their own (e.g., Kafka Streams and Samza applications).
```yaml For those topics, also wildcards are allowed in the topic name and, of course, no partition count or replication factor must be provided.
kafkaConfig: []
```
<!-- Further information: API Reference --> <!-- Further information: API Reference -->
......
...@@ -17,7 +17,7 @@ For each benchmark, we provide a [load generator as OCI container image](https:/ ...@@ -17,7 +17,7 @@ For each benchmark, we provide a [load generator as OCI container image](https:/
You can simply run a load generator container, for example, for benchmark UC1 with: You can simply run a load generator container, for example, for benchmark UC1 with:
```sh ```sh
docker run ghcr.io/cau-se/theodolite-uc1-workload-generator docker run -it ghcr.io/cau-se/theodolite-uc1-workload-generator
``` ```
### Message format ### Message format
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
\ No newline at end of file
...@@ -13,21 +13,19 @@ repositories { ...@@ -13,21 +13,19 @@ repositories {
} }
dependencies { dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){ implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.35.0'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.35.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients' exclude group: 'org.apache.kafka', module: 'kafka-clients'
} }
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2')
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
runtimeOnly 'org.slf4j:slf4j-api:1.7.32' runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32' runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
// Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
} }
...@@ -12,6 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; ...@@ -12,6 +12,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/ */
public class AbstractPipeline extends Pipeline { public class AbstractPipeline extends Pipeline {
private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD
private static final String KAFKA_CONFIG_SCHEMA_REGISTRY_URL = "schema.registry.url"; // NOPMD
protected final String inputTopic; protected final String inputTopic;
protected final String bootstrapServer; protected final String bootstrapServer;
// Application Configurations // Application Configurations
...@@ -21,8 +24,8 @@ public class AbstractPipeline extends Pipeline { ...@@ -21,8 +24,8 @@ public class AbstractPipeline extends Pipeline {
super(options); super(options);
this.config = config; this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); this.inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); this.bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
} }
/** /**
...@@ -32,19 +35,37 @@ public class AbstractPipeline extends Pipeline { ...@@ -32,19 +35,37 @@ public class AbstractPipeline extends Pipeline {
*/ */
public Map<String, Object> buildConsumerConfig() { public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.put(
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
config consumerConfig.put(
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
consumerConfig.put("schema.registry.url", this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); consumerConfig.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
consumerConfig.put("specific.avro.reader", this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); consumerConfig.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME); this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName); consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG,
this.config.getString(ConfigurationKeys.APPLICATION_NAME));
return consumerConfig; return consumerConfig;
} }
/**
* Builds a simple configuration for a Kafka producer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildProducerConfig() {
final Map<String, Object> config = new HashMap<>();
config.put(
KAFKA_CONFIG_SCHEMA_REGISTRY_URL,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
config.put(
KAFKA_CONFIG_SPECIFIC_AVRO_READER,
this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
return config;
}
} }
package theodolite.commons.beam.kafka;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A Kafka {@link Deserializer} for typed Schema Registry {@link ActivePowerRecord}.
*/
public class ActivePowerRecordDeserializer extends SpecificAvroDeserializer<ActivePowerRecord> {
}
package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
*/
public class KafkaActivePowerRecordReader extends
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig) {
super();
if (bootstrapServer == null) {
throw new IllegalArgumentException("bootstrapServer is null");
}
if (inputTopic == null) {
throw new IllegalArgumentException("inputTopic is null");
}
// Check if boostrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();
}
@Override
public PCollection<KV<String, ActivePowerRecord>> expand(final PBegin input) {
return input.apply(this.reader);
}
}
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
...@@ -12,39 +11,36 @@ import org.apache.kafka.common.serialization.StringDeserializer; ...@@ -12,39 +11,36 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO} with event time.
* Has additional a TimestampPolicy.
*/ */
public class KafkaActivePowerTimestampReader extends public class KafkaActivePowerTimestampReader
PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> { extends PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, public KafkaActivePowerTimestampReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig) { final Map<String, Object> consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if bootstrap server and inputTopic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
reader = this.reader = KafkaIO.<String, ActivePowerRecord>read().withBootstrapServers(bootstrapServer)
KafkaIO.<String, ActivePowerRecord>read() .withTopic(inputTopic).withKeyDeserializer(StringDeserializer.class)
.withBootstrapServers(bootstrapServer) .withValueDeserializerAndCoder(
.withTopic(inputTopic) ActivePowerRecordDeserializer.class,
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(ActivePowerRecord.class)) AvroCoder.of(ActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig) .withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory( .withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark)) (tp, previousWatermark) -> new EventTimePolicy(previousWatermark))
.withoutMetadata(); .withoutMetadata();
} }
......
package theodolite.commons.beam.kafka; package theodolite.commons.beam.kafka;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
...@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer; ...@@ -9,23 +10,35 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
/** /**
* Wrapper for a Kafka writing Transformation * Wrapper for a Kafka writing Transformation where the value type can be generic.
* where the value type can be generic. *
* @param <T> type of the value. * @param <T> type of the value.
*/ */
public class KafkaWriterTransformation<T> extends public class KafkaWriterTransformation<T> extends PTransform<PCollection<KV<String, T>>, PDone> {
PTransform<PCollection<KV<String, T>>, PDone> {
private static final long serialVersionUID = 3171423303843174723L; private static final long serialVersionUID = 3171423303843174723L;
private final PTransform<PCollection<KV<String, T>>, PDone> writer; private final PTransform<PCollection<KV<String, T>>, PDone> writer;
/** /**
* Creates a new kafka writer transformation. * Creates a new Kafka writer transformation.
*/ */
public KafkaWriterTransformation(final String bootstrapServer, final String outputTopic, public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer) { final Class<? extends Serializer<T>> valueSerializer) {
this(bootstrapServer, outputTopic, valueSerializer, Map.of());
}
/**
* Creates a new Kafka writer transformation.
*/
public KafkaWriterTransformation(
final String bootstrapServer,
final String outputTopic,
final Class<? extends Serializer<T>> valueSerializer,
final Map<String, Object> producerConfig) {
super(); super();
// Check if boostrap server and outputTopic are defined // Check if bootstrap server and outputTopic are defined
if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || outputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or outputTopic missing"); throw new IllegalArgumentException("bootstrapServer or outputTopic missing");
} }
...@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends ...@@ -34,7 +47,8 @@ public class KafkaWriterTransformation<T> extends
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(outputTopic) .withTopic(outputTopic)
.withKeySerializer(StringSerializer.class) .withKeySerializer(StringSerializer.class)
.withValueSerializer(valueSerializer); .withValueSerializer(valueSerializer)
.withProducerConfigUpdates(producerConfig);
} }
......
...@@ -3,6 +3,6 @@ plugins { ...@@ -3,6 +3,6 @@ plugins {
} }
dependencies { dependencies {
implementation group: 'org.apache.beam', name: 'beam-runners-flink-1.12', version: '2.27.0' implementation group: 'org.apache.beam', name: 'beam-runners-flink-1.13', version: '2.35.0'
implementation group: 'org.apache.flink', name: 'flink-statebackend-rocksdb_2.11', version: '1.12.0' implementation group: 'org.apache.flink', name: 'flink-statebackend-rocksdb_2.11', version: '1.13.0'
} }
\ No newline at end of file
...@@ -18,7 +18,7 @@ repositories { ...@@ -18,7 +18,7 @@ repositories {
} }
} }
def apacheBeamVersion = '2.22.0' //'2.27.0' // '2.34.0' def apacheBeamVersion = '2.35.0'
dependencies { dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
......
...@@ -3,7 +3,8 @@ plugins { ...@@ -3,7 +3,8 @@ plugins {
} }
dependencies { dependencies {
implementation('org.apache.beam:beam-runners-samza:2.22.0') { implementation('org.apache.beam:beam-runners-samza:2.35.0') {
exclude group: 'org.apache.samza', module: 'samza-yarn_2.11' exclude group: 'org.apache.samza', module: 'samza-yarn_2.11'
} }
implementation 'org.apache.samza:samza-kafka_2.11:1.5.0'
} }
\ No newline at end of file
...@@ -20,7 +20,7 @@ shadowJar { ...@@ -20,7 +20,7 @@ shadowJar {
tasks.distZip.enabled = false tasks.distZip.enabled = false
ext { ext {
flinkVersion = '1.12.2' flinkVersion = '1.13.5'
scalaBinaryVersion = '2.12' scalaBinaryVersion = '2.12'
} }
......
...@@ -22,7 +22,7 @@ dependencies { ...@@ -22,7 +22,7 @@ dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers implementation 'org.apache.kafka:kafka-streams:3.1.0'
implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25' implementation 'org.slf4j:slf4j-simple:1.7.25'
......
...@@ -5,4 +5,7 @@ ...@@ -5,4 +5,7 @@
<Rank value="16" /> <Rank value="16" />
</Match> </Match>
<!-- Temporary disabled due to potential false positive reported in https://github.com/spotbugs/spotbugs/issues/1923. -->
<Bug code="NP" />
</FindBugsFilter> </FindBugsFilter>
\ No newline at end of file
...@@ -36,3 +36,19 @@ the host, for example, from the IDE or Gradle. In such cases, the following adju ...@@ -36,3 +36,19 @@ the host, for example, from the IDE or Gradle. In such cases, the following adju
You can now connect to Kafka from your host system with bootstrap server `localhost:19092` and contact the Schema You can now connect to Kafka from your host system with bootstrap server `localhost:19092` and contact the Schema
Registry via `localhost:8081`. **Pay attention to the Kafka port, which is *19092* instead of the default one *9092*.** Registry via `localhost:8081`. **Pay attention to the Kafka port, which is *19092* instead of the default one *9092*.**
## Running Smoke Tests
The `smoketest-runner.sh` script can be used to run a simple test for a specific Docker Compose file. You can call it with
```sh
./smoketest-runner.sh <docker-compose-dir>
```
where `<docker-compose-dir>` is the directory of a Docker-Compose file, for example, `uc2-beam-samza`. The script exists with a zero exit code in case of success and a non-zero exit code otherwise.
You can also run the set of all smoke test with:
```sh
./smoketest-runner-all.sh
```
#!/bin/sh
find . -name 'test.sh' -type f -exec dirname {} \; |
sort |
xargs -I %s sh -c "./smoketest-runner.sh %s 1>&2; echo $?" |
sort |
awk 'BEGIN {count[0]=0; count[1]=0} {count[$1!=0]++} END {print count[0] " tests successful, " count[1] " test failed."; exit count[1]}'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment