diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c4b8495224ad81ae2eebe81b0087319422da1969..46096a4d792f7be5a81be7128e4e43a95fb0a9f9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,9 +1,16 @@ +include: + - template: 'Workflows/Branch-Pipelines.gitlab-ci.yml' + stages: - build - test - check - deploy +default: + tags: + - exec-docker + .dind: tags: - exec-dind @@ -15,15 +22,29 @@ stages: variables: DOCKER_TLS_CERTDIR: "/certs" +.kaniko-push: + image: + name: gcr.io/kaniko-project/executor:debug + entrypoint: [""] + script: + - mkdir -p /kaniko/.docker + - echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json + - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') + - "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\"" + - "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\"" + - "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\"" + - "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\"" + - /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D + + # Theodolite Helm Chart lint-helm: stage: check + needs: [] image: name: alpine/helm:3.5.2 entrypoint: [""] - tags: - - exec-docker script: helm lint helm/ @@ -31,8 +52,6 @@ lint-helm: .benchmarks: image: openjdk:11-jdk - tags: - - exec-docker variables: GRADLE_OPTS: "-Dorg.gradle.daemon=false" cache: @@ -42,6 +61,11 @@ lint-helm: before_script: - export GRADLE_USER_HOME=`pwd`/.gradle - cd theodolite-benchmarks + rules: + - changes: + - theodolite-benchmarks/**/* + - when: manual + allow_failure: true build-benchmarks: stage: build @@ -108,24 +132,17 @@ spotbugs-benchmarks: stage: deploy extends: - .benchmarks - - .dind + - .kaniko-push needs: - build-benchmarks - checkstyle-benchmarks - pmd-benchmarks - spotbugs-benchmarks - script: - - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - - docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME - - "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest" - - "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA" - - "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG" - - echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin - - docker push $CR_HOST/$CR_ORG/$IMAGE_NAME - - docker logout + variables: + CONTEXT: "/$JAVA_PROJECT_NAME" + #before_script: + # - cd theodolite-benchmarks/$JAVA_PROJECT_NAME rules: - - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG" - when: always - changes: - theodolite-benchmarks/* - theodolite-benchmarks/$JAVA_PROJECT_NAME/**/* @@ -133,7 +150,6 @@ spotbugs-benchmarks: - theodolite-benchmarks/flink-commons/**/* - theodolite-benchmarks/load-generator-commons/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME" - when: always - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME" when: manual allow_failure: true @@ -217,8 +233,6 @@ deploy-uc4-load-generator: image: name: ghcr.io/graalvm/native-image:java11-21.1.0 entrypoint: [""] - tags: - - exec-docker variables: GRADLE_OPTS: "-Dorg.gradle.daemon=false" cache: @@ -284,78 +298,72 @@ deploy-theodolite: stage: deploy extends: - .theodolite - - .dind + - .kaniko-push needs: #- build-theodolite-native - build-theodolite-jvm - test-theodolite - script: - - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - #- docker build -f src/main/docker/Dockerfile.native -t theodolite . - - docker build -f src/main/docker/Dockerfile.jvm -t theodolite . - - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:${DOCKER_TAG_NAME}latest" - - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA" - - "[ $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$CI_COMMIT_TAG" - - echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin - - docker push $CR_HOST/$CR_ORG/theodolite - - docker logout + variables: + IMAGE_NAME: theodolite + DOCKERFILE: src/main/docker/Dockerfile.jvm + #DOCKERFILE: src/main/docker/Dockerfile.native rules: - - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG" - when: always - changes: - theodolite/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" - when: always - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" when: manual allow_failure: true -# Theodolite SLO Checker: Lag Trend +# Theodolite SLO Checker test-slo-checker-lag-trend: stage: test + needs: [] image: python:3.7-slim - tags: - - exec-docker - script: + before_script: - cd slo-checker/record-lag + script: - pip install -r requirements.txt - cd app - python -m unittest + rules: + - changes: + - slo-checker/record-lag/**/* + - when: manual + allow_failure: true test-slo-checker-dropped-records-kstreams: stage: test + needs: [] image: python:3.7-slim - tags: - - exec-docker - script: + before_script: - cd slo-checker/dropped-records + script: - pip install -r requirements.txt - cd app - python -m unittest + rules: + - changes: + - slo-checker/dropped-records/**/* + - when: manual + allow_failure: true deploy-slo-checker-lag-trend: stage: deploy extends: - - .dind + - .kaniko-push needs: - test-slo-checker-lag-trend - script: - - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - - docker build --pull -t theodolite-slo-checker-lag-trend slo-checker/record-lag - - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-lag-trend $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend:${DOCKER_TAG_NAME}latest" - - "[ $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-lag-trend $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend:$CI_COMMIT_TAG" - - echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin - - docker push $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend - - docker logout + before_script: + - cd slo-checker/record-lag + variables: + IMAGE_NAME: theodolite-slo-checker-lag-trend rules: - - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG" - when: always - changes: - slo-checker/record-lag/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" - when: always - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" when: manual allow_failure: true @@ -363,24 +371,17 @@ deploy-slo-checker-lag-trend: deploy-slo-checker-dropped-records-kstreams: stage: deploy extends: - - .dind + - .kaniko-push needs: - test-slo-checker-dropped-records-kstreams - script: - - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - - docker build --pull -t theodolite-slo-checker-dropped-records-kstreams slo-checker/dropped-records - - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-dropped-records-kstreams $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams:${DOCKER_TAG_NAME}latest" - - "[ $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-dropped-records-kstreams $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams:$CI_COMMIT_TAG" - - echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin - - docker push $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams - - docker logout + before_script: + - cd slo-checker/dropped-records + variables: + IMAGE_NAME: theodolite-slo-checker-dropped-records-kstreams rules: - - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG" - when: always - changes: - slo-checker/dropped-records/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" - when: always - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" when: manual allow_failure: true @@ -391,23 +392,16 @@ deploy-slo-checker-dropped-records-kstreams: deploy-random-scheduler: stage: deploy extends: - - .dind + - .kaniko-push needs: [] - script: - - DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//') - - docker build --pull -t theodolite-random-scheduler execution/infrastructure/random-scheduler - - "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-random-scheduler $CR_HOST/$CR_ORG/theodolite-random-scheduler:${DOCKER_TAG_NAME}latest" - - "[ $CI_COMMIT_TAG ] && docker tag theodolite-random-scheduler $CR_HOST/$CR_ORG/theodolite-random-scheduler:$CI_COMMIT_TAG" - - echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin - - docker push $CR_HOST/$CR_ORG/theodolite-random-scheduler - - docker logout + before_script: + - cd execution/infrastructure/random-scheduler + variables: + IMAGE_NAME: theodolite-random-scheduler rules: - - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG" - when: always - changes: - execution/infrastructure/random-scheduler/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" - when: always - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW" when: manual allow_failure: true diff --git a/docs/CNAME b/docs/CNAME new file mode 100644 index 0000000000000000000000000000000000000000..b1c7ffdbcd7523245c451869092ff0498bd7b8db --- /dev/null +++ b/docs/CNAME @@ -0,0 +1 @@ +www.theodolite.rocks \ No newline at end of file diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle index d6d5217667a73a1529d73ac59260bcf47d8cf2e1..333a87bf55bcd0051be05ca91dfe8dc9a2e9e8fa 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.flink.gradle @@ -48,19 +48,19 @@ dependencies { implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.code.gson:gson:2.8.2' implementation 'org.slf4j:slf4j-simple:1.6.1' - compile project(':flink-commons') + implementation project(':flink-commons') //compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0" - compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" + implementation "org.apache.flink:flink-java:${flinkVersion}" + implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" + implementation "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}" + implementation "org.apache.flink:flink-table-planner-blink_${scalaBinaryVersion}:${flinkVersion}" + implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}" implementation "org.apache.flink:flink-avro:${flinkVersion}" implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development - compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-metrics-prometheus_${scalaBinaryVersion}", version: "${flinkVersion}" + implementation "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}" // For debugging + implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}" + implementation "org.apache.flink:flink-metrics-prometheus_${scalaBinaryVersion}:${flinkVersion}" // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle index 773872648edfd4b30218a99d307b6e7c45ed3470..5b0e2a8a1211653428b296b11b14c1531e40e46b 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.java-conventions.gradle @@ -50,7 +50,7 @@ pmd { ruleSets = [] // Gradle requires to clean the rule sets first ruleSetFiles = files("$rootProject.projectDir/config/pmd.xml") ignoreFailures = false - toolVersion = "6.7.0" + toolVersion = "6.13.0" } checkstyle { @@ -58,7 +58,7 @@ checkstyle { configFile = file("$rootProject.projectDir/config/checkstyle.xml") maxWarnings = 0 ignoreFailures = false - toolVersion = "8.12" + toolVersion = "8.19" } spotbugs { diff --git a/theodolite-benchmarks/docker-test/README.md b/theodolite-benchmarks/docker-test/README.md new file mode 100644 index 0000000000000000000000000000000000000000..fd1e9bf4730f897273be45a022ad2adeae1b7e6e --- /dev/null +++ b/theodolite-benchmarks/docker-test/README.md @@ -0,0 +1,38 @@ +# Docker Compose Files for Testing + +This directory contains Docker Compose files, which help testing Benchmark implementations. +For each stream processing engine (Kafka Streams and Flink) and Benchmark (UC1-4), a Docker Compose file is provided +in the corresponding subdirectory. + +## Full Dockerized Testing + +Running the load generator, the benchmark and all required infrastructure (Kafka etc.) is easy. Simply, `cd` into the +directory of the benchmark implementation Compose file and *up* it. +For example: + +```sh +cd uc1-kstreams-docker-compose/ +docker-compose up -d +``` + +On less powerful hardware, starting all containers together might fail. In such cases, it usually helps to first *up* +Kafka and ZooKeeper (`docker-compose up -d kafka zookeeper`), then after some delay the Schema Registry +(`docker-compose up -d schema-registry`) and finally, after some more further delay, the rest (`docker-compose up -d`). + +To tear down the entire Docker Compose configuration: + +```sh +docker-compose down +``` + +## Benchmark (+ Load Generator) on Host, Infrastructure in Docker + +For development and debugging purposes, it is often required to run the benchmark and/or the load generator directly on +the host, for example, from the IDE or Gradle. In such cases, the following adjustments have to be made to the +`docker-compose.yaml` file: + +1. Comment out the services that you intend to run locally. +2. Uncomment the `ports` block in the Kafka and the Schema Registry services. + +You can now connect to Kafka from your host system with bootstrap server `localhost:19092` and contact the Schema +Registry via `localhost:8081`. **Pay attention to the Kafka port, which is *19092* instead of the default one *9092*.** diff --git a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml index aa35ac2d1dee01cdf25d2eb2ac77bd056865479a..419c9cfb741578cccd91845c8164d4e5554d2ab6 100755 --- a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml @@ -3,7 +3,7 @@ services: zookeeper: image: confluentinc/cp-zookeeper expose: - - "9092" + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -37,7 +37,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml index 403becacff5a386eddfaa8e59fe7873d2adb006c..cebf3676e92f8ececa5b6707df156f9f22f3be38 100755 --- a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml @@ -3,7 +3,7 @@ services: zookeeper: image: confluentinc/cp-zookeeper expose: - - "9092" + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -45,7 +45,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml index a8bf56d52c1be7fea3f172d86f6deac27fcc24f7..c4265702b6f2b833e7b3792a787e3d8a67486ac7 100755 --- a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml @@ -1,10 +1,9 @@ version: '2' services: zookeeper: - #image: wurstmeister/zookeeper image: confluentinc/cp-zookeeper - ports: - - "2181:2181" + expose: + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -38,7 +37,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml index 20d2c62dac13af29ec50439670308f2911f0d57a..b520611e4855f6e942fab62b02d27d5f360860d1 100755 --- a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml @@ -1,10 +1,9 @@ version: '2' services: zookeeper: - #image: wurstmeister/zookeeper image: confluentinc/cp-zookeeper - ports: - - "2181:2181" + expose: + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -47,7 +46,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml index 9999caf046e844d066200ecfbf15d3351c167d31..2c69a659c6ed1e83c149e699484ec148196806c5 100755 --- a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml @@ -1,10 +1,9 @@ version: '2' services: zookeeper: - #image: wurstmeister/zookeeper image: confluentinc/cp-zookeeper - ports: - - "2181:2181" + expose: + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -38,7 +37,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml index ef16b858536b0d133dc49d002d16cf6c04193297..5ed8e7a673afd825b2e1426fa018db3e00848296 100755 --- a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml @@ -1,10 +1,9 @@ version: '2' services: zookeeper: - #image: wurstmeister/zookeeper image: confluentinc/cp-zookeeper - ports: - - "2181:2181" + expose: + - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: @@ -32,7 +31,7 @@ services: environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' - benchmark: + benchmark: image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest depends_on: - schema-registry @@ -46,7 +45,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml index 80720063991100bae2c8c148f14cd6f1a32bb0ff..b6bb905e2a950e23970392f256f16935a7777fed 100755 --- a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml @@ -37,7 +37,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml index 5e4cb94469f2f6cc8c48694a7ea6c885f066622d..68264b244c16f1a1be7b370bb4e78052d3a8518f 100755 --- a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml @@ -45,7 +45,7 @@ services: - schema-registry - kafka environment: - BOOTSTRAP_SERVER: uc-wg:5701 + BOOTSTRAP_SERVER: load-generator:5701 PORT: 5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 diff --git a/theodolite-benchmarks/flink-commons/build.gradle b/theodolite-benchmarks/flink-commons/build.gradle index 1b0b9359a406bf2ab16fbbe52631877cf360df2a..0da7c6f93f4e77e1376f5f2d006ec0bf0f398ec8 100644 --- a/theodolite-benchmarks/flink-commons/build.gradle +++ b/theodolite-benchmarks/flink-commons/build.gradle @@ -22,13 +22,14 @@ dependencies { implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.guava:guava:30.1-jre' - compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-runtime_${scalaBinaryVersion}", version: "${flinkVersion}" - compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}" - compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}" + + implementation "org.apache.flink:flink-java:${flinkVersion}" + implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}" implementation "org.apache.flink:flink-avro:${flinkVersion}" implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}" + implementation "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}" // For debugging + implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}" + implementation "org.apache.flink:flink-metrics-prometheus_${scalaBinaryVersion}:${flinkVersion}" // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java index f1f9870fda73ccec0fc25c5c70665759ab07d893..fe74fbe4b9dcb6ce89d10131de1336bfff40a919 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/theodolite/commons/flink/serialization/StatsSerializer.java @@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.math.Stats; - import java.io.Serializable; /** @@ -13,7 +12,7 @@ import java.io.Serializable; */ public class StatsSerializer extends Serializer<Stats> implements Serializable { - private static final long serialVersionUID = -1276866176534267373L; //NOPMD + private static final long serialVersionUID = -1276866176534267373L; // NOPMD @Override public void write(final Kryo kryo, final Output output, final Stats object) { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java new file mode 100644 index 0000000000000000000000000000000000000000..11a9cbf2d96bc3a02f3972ba23f2167af06a2ec3 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/GeneratorAction.java @@ -0,0 +1,18 @@ +package theodolite.commons.workloadgeneration; + +/** + * Interface representing a record generator action consisting of generating a record and sending + * it. + */ +@FunctionalInterface +interface GeneratorAction { + + void generate(final String key); + + public static <T> GeneratorAction from( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + return key -> sender.send(generator.generate(key)); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index 6e4a43271fbf1e0193c2d39569a0814d1f7935cd..ded7c347c8d6b057581dc63b691df5bb60997791 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender avroSerdeFactory.<T>forKeys().serializer()); } + /** + * Write the passed monitoring record to Kafka. + */ + public void write(final T monitoringRecord) { + final ProducerRecord<String, T> record = + new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), + this.keyAccessor.apply(monitoringRecord), monitoringRecord); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + + public void terminate() { + this.producer.close(); + } + + @Override + public void send(final T message) { + this.write(message); + } + public static <T extends SpecificRecord> Builder<T> builder( final String bootstrapServers, final String topic, @@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender } } - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } - } - - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - this.write(message); - } - } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index a9a1ce65ac32e3508299c99a38ecd21e4c9461cf..73f064d1ce44ff8a613f9ce0a7b9a64d4bac6c38 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -91,12 +91,11 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanMessageGeneratorFactory - .withKafkaConfig( - KAFKA_BOOTSTRAP_SERVERS_DEFAULT, - KAFKA_TOPIC_DEFAULT, - SCHEMA_REGISTRY_URL_DEFAULT) - .forConstantValue(VALUE_DEFAULT))); + TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT), + TitanKafkaSenderFactory.forKafkaConfig( + KAFKA_BOOTSTRAP_SERVERS_DEFAULT, + KAFKA_TOPIC_DEFAULT, + SCHEMA_REGISTRY_URL_DEFAULT))); } /** @@ -170,13 +169,11 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanMessageGeneratorFactory - .withKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl, - kafkaProperties) - .forConstantValue(value))) + TitanRecordGeneratorFactory.forConstantValue(value), + TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl))) .withThreads(threads); } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java index 2e907d8e90172288099bc6a1776777c37ae90fff..4b5fea3e4670315ef47d94669b42a3cca4b5d0ae 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorConfig.java @@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration; */ public class LoadGeneratorConfig { - private final MessageGenerator messageGenerator; + private final GeneratorAction messageGenerator; private BeforeAction beforeAction = BeforeAction.doNothing(); private int threads = 1; - public LoadGeneratorConfig(final MessageGenerator messageGenerator) { - this.messageGenerator = messageGenerator; + public <T> LoadGeneratorConfig( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + this.messageGenerator = GeneratorAction.from(generator, sender); } - public LoadGeneratorConfig( - final MessageGenerator messageGenerator, + public <T> LoadGeneratorConfig( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender, final int threads) { - this.messageGenerator = messageGenerator; + this(generator, sender); this.threads = threads; } - public LoadGeneratorExecution buildLoadGeneratorExecution( - final WorkloadDefinition workloadDefinition) { - return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); - } - - public BeforeAction getBeforeAction() { - return this.beforeAction; - } - public void setThreads(final int threads) { this.threads = threads; } @@ -37,6 +31,13 @@ public class LoadGeneratorConfig { this.beforeAction = beforeAction; } + public BeforeAction getBeforeAction() { + return this.beforeAction; + } + public LoadGeneratorExecution buildLoadGeneratorExecution( + final WorkloadDefinition workloadDefinition) { + return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); + } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java index 3934c3d3499215b37ce96391ff5ae1d5cc135f84..e1a2a7e1bea964b5c69a6cd34374d7b0932bac03 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorExecution.java @@ -8,25 +8,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be + * A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be * started and stopped. */ -public class LoadGeneratorExecution { +class LoadGeneratorExecution { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class); private final Random random = new Random(); private final WorkloadDefinition workloadDefinition; - private final MessageGenerator messageGenerator; + private final GeneratorAction messageGenerator; private final ScheduledExecutorService executor; /** * Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a - * {@link MessageGenerator}. Load is generated by the given number of threads. + * {@link GeneratorAction}. Load is generated by the given number of threads. */ public LoadGeneratorExecution( final WorkloadDefinition workloadDefinition, - final MessageGenerator messageGenerator, + final GeneratorAction messageGenerator, final int threads) { this.workloadDefinition = workloadDefinition; this.messageGenerator = messageGenerator; diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java deleted file mode 100644 index c369f16557d60dae50e22ec7ad820c6a0ab4d137..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/MessageGenerator.java +++ /dev/null @@ -1,18 +0,0 @@ -package theodolite.commons.workloadgeneration; - -/** - * Interface representing a message generator, which sends messages for given keys to some - * destination. - */ -@FunctionalInterface -public interface MessageGenerator { - - void generate(final String key); - - public static <T> MessageGenerator from( - final RecordGenerator<T> generator, - final RecordSender<T> sender) { - return key -> sender.send(generator.generate(key)); - } - -} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0cdf8d91ea01cc16df5dcd55d77b08c3f4986442 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanKafkaSenderFactory.java @@ -0,0 +1,42 @@ +package theodolite.commons.workloadgeneration; + +import java.util.Properties; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link KafkaRecordSender}s that sends Titan {@link ActivePowerRecord}s. + */ +public final class TitanKafkaSenderFactory { + + private TitanKafkaSenderFactory() {} + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl) { + return forKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); + } + + /** + * Create a new KafkaRecordSender for {@link ActivePowerRecord}s for the given Kafka + * configuration. + */ + public static KafkaRecordSender<ActivePowerRecord> forKafkaConfig( + final String bootstrapServers, + final String topic, + final String schemaRegistryUrl, + final Properties properties) { + return KafkaRecordSender + .<ActivePowerRecord>builder( + bootstrapServers, + topic, + schemaRegistryUrl) + .keyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java deleted file mode 100644 index bd0b41d4e6e004d024ed2fd179eddcf6af50438f..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanMessageGeneratorFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -package theodolite.commons.workloadgeneration; - -import java.util.Properties; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * A factory for creating {@link MessageGenerator}s that creates Titan {@link ActivePowerRecord}s - * and sends them via Kafka. - */ -public final class TitanMessageGeneratorFactory { - - private final RecordSender<ActivePowerRecord> recordSender; - - private TitanMessageGeneratorFactory(final RecordSender<ActivePowerRecord> recordSender) { - this.recordSender = recordSender; - } - - /** - * Create a {@link MessageGenerator} that generates Titan {@link ActivePowerRecord}s with a - * constant value. - */ - public MessageGenerator forConstantValue(final double value) { - return MessageGenerator.from( - sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value), - this.recordSender); - } - - /** - * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. - */ - public static TitanMessageGeneratorFactory withKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl) { - return withKafkaConfig(bootstrapServers, topic, schemaRegistryUrl, new Properties()); - } - - /** - * Create a new TitanMessageGeneratorFactory for the given Kafka configuration. - */ - public static TitanMessageGeneratorFactory withKafkaConfig( - final String bootstrapServers, - final String topic, - final String schemaRegistryUrl, - final Properties properties) { - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = KafkaRecordSender - .<ActivePowerRecord>builder( - bootstrapServers, - topic, - schemaRegistryUrl) - .keyAccessor(r -> r.getIdentifier()) - .timestampAccessor(r -> r.getTimestamp()) - .build(); - return new TitanMessageGeneratorFactory(kafkaRecordSender); - } - -} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..4e1c10071eff28d77514dbc121e30bead3f6fa74 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java @@ -0,0 +1,21 @@ +package theodolite.commons.workloadgeneration; + +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s. + */ +public final class TitanRecordGeneratorFactory { + + + private TitanRecordGeneratorFactory() {} + + /** + * Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a + * constant value. + */ + public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) { + return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value); + } + +}