Skip to content
Snippets Groups Projects
Commit f643036f authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite into introduce-actionCommands

parents 276d9ebb 7c2acb21
No related branches found
No related tags found
1 merge request!201Introduce action commands
Showing
with 222 additions and 177 deletions
include:
- template: 'Workflows/Branch-Pipelines.gitlab-ci.yml'
stages:
- build
- test
- check
- deploy
default:
tags:
- exec-docker
.dind:
tags:
- exec-dind
......@@ -15,15 +22,29 @@ stages:
variables:
DOCKER_TLS_CERTDIR: "/certs"
.kaniko-push:
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
script:
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"${CR_HOST}\":{\"auth\":\"$(printf "%s:%s" "${CR_USER}" "${CR_PW}" | base64 | tr -d '\n')\"}}}" > /kaniko/.docker/config.json
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest\""
- "[ ! $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA\""
- "[ $CI_COMMIT_TAG ] && KANIKO_D=\"$KANIKO_D -d $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG\""
- "[ $DOCKERFILE ] && KANIKO_DOCKERFILE=\"--dockerfile $DOCKERFILE\""
- /kaniko/executor --context `pwd`/$CONTEXT $KANIKO_DOCKERFILE $KANIKO_D
# Theodolite Helm Chart
lint-helm:
stage: check
needs: []
image:
name: alpine/helm:3.5.2
entrypoint: [""]
tags:
- exec-docker
script: helm lint helm/
......@@ -31,8 +52,6 @@ lint-helm:
.benchmarks:
image: openjdk:11-jdk
tags:
- exec-docker
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
......@@ -42,6 +61,11 @@ lint-helm:
before_script:
- export GRADLE_USER_HOME=`pwd`/.gradle
- cd theodolite-benchmarks
rules:
- changes:
- theodolite-benchmarks/**/*
- when: manual
allow_failure: true
build-benchmarks:
stage: build
......@@ -108,24 +132,17 @@ spotbugs-benchmarks:
stage: deploy
extends:
- .benchmarks
- .dind
- .kaniko-push
needs:
- build-benchmarks
- checkstyle-benchmarks
- pmd-benchmarks
- spotbugs-benchmarks
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t $IMAGE_NAME ./$JAVA_PROJECT_NAME
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag $IMAGE_NAME $CR_HOST/$CR_ORG/$IMAGE_NAME:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/$IMAGE_NAME
- docker logout
variables:
CONTEXT: "/$JAVA_PROJECT_NAME"
#before_script:
# - cd theodolite-benchmarks/$JAVA_PROJECT_NAME
rules:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $CI_COMMIT_TAG"
when: always
- changes:
- theodolite-benchmarks/*
- theodolite-benchmarks/$JAVA_PROJECT_NAME/**/*
......@@ -133,7 +150,6 @@ spotbugs-benchmarks:
- theodolite-benchmarks/flink-commons/**/*
- theodolite-benchmarks/load-generator-commons/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual
allow_failure: true
......@@ -217,8 +233,6 @@ deploy-uc4-load-generator:
image:
name: ghcr.io/graalvm/native-image:java11-21.1.0
entrypoint: [""]
tags:
- exec-docker
variables:
GRADLE_OPTS: "-Dorg.gradle.daemon=false"
cache:
......@@ -284,78 +298,72 @@ deploy-theodolite:
stage: deploy
extends:
- .theodolite
- .dind
- .kaniko-push
needs:
#- build-theodolite-native
- build-theodolite-jvm
- test-theodolite
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
#- docker build -f src/main/docker/Dockerfile.native -t theodolite .
- docker build -f src/main/docker/Dockerfile.jvm -t theodolite .
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:${DOCKER_TAG_NAME}latest"
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$DOCKER_TAG_NAME$CI_COMMIT_SHORT_SHA"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite $CR_HOST/$CR_ORG/theodolite:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/theodolite
- docker logout
variables:
IMAGE_NAME: theodolite
DOCKERFILE: src/main/docker/Dockerfile.jvm
#DOCKERFILE: src/main/docker/Dockerfile.native
rules:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG"
when: always
- changes:
- theodolite/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
# Theodolite SLO Checker: Lag Trend
# Theodolite SLO Checker
test-slo-checker-lag-trend:
stage: test
needs: []
image: python:3.7-slim
tags:
- exec-docker
script:
before_script:
- cd slo-checker/record-lag
script:
- pip install -r requirements.txt
- cd app
- python -m unittest
rules:
- changes:
- slo-checker/record-lag/**/*
- when: manual
allow_failure: true
test-slo-checker-dropped-records-kstreams:
stage: test
needs: []
image: python:3.7-slim
tags:
- exec-docker
script:
before_script:
- cd slo-checker/dropped-records
script:
- pip install -r requirements.txt
- cd app
- python -m unittest
rules:
- changes:
- slo-checker/dropped-records/**/*
- when: manual
allow_failure: true
deploy-slo-checker-lag-trend:
stage: deploy
extends:
- .dind
- .kaniko-push
needs:
- test-slo-checker-lag-trend
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-slo-checker-lag-trend slo-checker/record-lag
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-lag-trend $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend:${DOCKER_TAG_NAME}latest"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-lag-trend $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/theodolite-slo-checker-lag-trend
- docker logout
before_script:
- cd slo-checker/record-lag
variables:
IMAGE_NAME: theodolite-slo-checker-lag-trend
rules:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG"
when: always
- changes:
- slo-checker/record-lag/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
......@@ -363,24 +371,17 @@ deploy-slo-checker-lag-trend:
deploy-slo-checker-dropped-records-kstreams:
stage: deploy
extends:
- .dind
- .kaniko-push
needs:
- test-slo-checker-dropped-records-kstreams
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-slo-checker-dropped-records-kstreams slo-checker/dropped-records
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-dropped-records-kstreams $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams:${DOCKER_TAG_NAME}latest"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite-slo-checker-dropped-records-kstreams $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/theodolite-slo-checker-dropped-records-kstreams
- docker logout
before_script:
- cd slo-checker/dropped-records
variables:
IMAGE_NAME: theodolite-slo-checker-dropped-records-kstreams
rules:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG"
when: always
- changes:
- slo-checker/dropped-records/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
......@@ -391,23 +392,16 @@ deploy-slo-checker-dropped-records-kstreams:
deploy-random-scheduler:
stage: deploy
extends:
- .dind
- .kaniko-push
needs: []
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-random-scheduler execution/infrastructure/random-scheduler
- "[ ! $CI_COMMIT_TAG ] && docker tag theodolite-random-scheduler $CR_HOST/$CR_ORG/theodolite-random-scheduler:${DOCKER_TAG_NAME}latest"
- "[ $CI_COMMIT_TAG ] && docker tag theodolite-random-scheduler $CR_HOST/$CR_ORG/theodolite-random-scheduler:$CI_COMMIT_TAG"
- echo $CR_PW | docker login $CR_HOST -u $CR_USER --password-stdin
- docker push $CR_HOST/$CR_ORG/theodolite-random-scheduler
- docker logout
before_script:
- cd execution/infrastructure/random-scheduler
variables:
IMAGE_NAME: theodolite-random-scheduler
rules:
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $CI_COMMIT_TAG"
when: always
- changes:
- execution/infrastructure/random-scheduler/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: always
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
www.theodolite.rocks
\ No newline at end of file
......@@ -48,19 +48,19 @@ dependencies {
implementation 'com.google.guava:guava:24.1-jre'
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'org.slf4j:slf4j-simple:1.6.1'
compile project(':flink-commons')
implementation project(':flink-commons')
//compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
implementation "org.apache.flink:flink-java:${flinkVersion}"
implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-table-planner-blink_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development
compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-metrics-prometheus_${scalaBinaryVersion}", version: "${flinkVersion}"
implementation "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}" // For debugging
implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-metrics-prometheus_${scalaBinaryVersion}:${flinkVersion}"
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
......@@ -50,7 +50,7 @@ pmd {
ruleSets = [] // Gradle requires to clean the rule sets first
ruleSetFiles = files("$rootProject.projectDir/config/pmd.xml")
ignoreFailures = false
toolVersion = "6.7.0"
toolVersion = "6.13.0"
}
checkstyle {
......@@ -58,7 +58,7 @@ checkstyle {
configFile = file("$rootProject.projectDir/config/checkstyle.xml")
maxWarnings = 0
ignoreFailures = false
toolVersion = "8.12"
toolVersion = "8.19"
}
spotbugs {
......
# Docker Compose Files for Testing
This directory contains Docker Compose files, which help testing Benchmark implementations.
For each stream processing engine (Kafka Streams and Flink) and Benchmark (UC1-4), a Docker Compose file is provided
in the corresponding subdirectory.
## Full Dockerized Testing
Running the load generator, the benchmark and all required infrastructure (Kafka etc.) is easy. Simply, `cd` into the
directory of the benchmark implementation Compose file and *up* it.
For example:
```sh
cd uc1-kstreams-docker-compose/
docker-compose up -d
```
On less powerful hardware, starting all containers together might fail. In such cases, it usually helps to first *up*
Kafka and ZooKeeper (`docker-compose up -d kafka zookeeper`), then after some delay the Schema Registry
(`docker-compose up -d schema-registry`) and finally, after some more further delay, the rest (`docker-compose up -d`).
To tear down the entire Docker Compose configuration:
```sh
docker-compose down
```
## Benchmark (+ Load Generator) on Host, Infrastructure in Docker
For development and debugging purposes, it is often required to run the benchmark and/or the load generator directly on
the host, for example, from the IDE or Gradle. In such cases, the following adjustments have to be made to the
`docker-compose.yaml` file:
1. Comment out the services that you intend to run locally.
2. Uncomment the `ports` block in the Kafka and the Schema Registry services.
You can now connect to Kafka from your host system with bootstrap server `localhost:19092` and contact the Schema
Registry via `localhost:8081`. **Pay attention to the Kafka port, which is *19092* instead of the default one *9092*.**
......@@ -3,7 +3,7 @@ services:
zookeeper:
image: confluentinc/cp-zookeeper
expose:
- "9092"
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -37,7 +37,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
......@@ -3,7 +3,7 @@ services:
zookeeper:
image: confluentinc/cp-zookeeper
expose:
- "9092"
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -45,7 +45,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -38,7 +37,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -47,7 +46,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -38,7 +37,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
version: '2'
services:
zookeeper:
#image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
expose:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
......@@ -32,7 +31,7 @@ services:
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
benchmark:
benchmark:
image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
depends_on:
- schema-registry
......@@ -46,7 +45,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
......@@ -37,7 +37,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
......@@ -45,7 +45,7 @@ services:
- schema-registry
- kafka
environment:
BOOTSTRAP_SERVER: uc-wg:5701
BOOTSTRAP_SERVER: load-generator:5701
PORT: 5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
......
......@@ -22,13 +22,14 @@ dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-flink-ready-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:30.1-jre'
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-runtime_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
implementation "org.apache.flink:flink-java:${flinkVersion}"
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
implementation "org.apache.flink:flink-runtime-web_${scalaBinaryVersion}:${flinkVersion}" // For debugging
implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-metrics-prometheus_${scalaBinaryVersion}:${flinkVersion}"
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
......@@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.math.Stats;
import java.io.Serializable;
/**
......@@ -13,7 +12,7 @@ import java.io.Serializable;
*/
public class StatsSerializer extends Serializer<Stats> implements Serializable {
private static final long serialVersionUID = -1276866176534267373L; //NOPMD
private static final long serialVersionUID = -1276866176534267373L; // NOPMD
@Override
public void write(final Kryo kryo, final Output output, final Stats object) {
......
package theodolite.commons.workloadgeneration;
/**
* Interface representing a message generator, which sends messages for given keys to some
* destination.
* Interface representing a record generator action consisting of generating a record and sending
* it.
*/
@FunctionalInterface
public interface MessageGenerator {
interface GeneratorAction {
void generate(final String key);
public static <T> MessageGenerator from(
final RecordGenerator<T> generator,
final RecordSender<T> sender) {
public static <T> GeneratorAction from(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
return key -> sender.send(generator.generate(key));
}
......
......@@ -53,6 +53,33 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
avroSerdeFactory.<T>forKeys().serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers,
final String topic,
......@@ -108,31 +135,4 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
}
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
}
......@@ -91,12 +91,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)
.forConstantValue(VALUE_DEFAULT)));
TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
TitanKafkaSenderFactory.forKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
SCHEMA_REGISTRY_URL_DEFAULT)));
}
/**
......@@ -170,13 +169,11 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanMessageGeneratorFactory
.withKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl,
kafkaProperties)
.forConstantValue(value)))
TitanRecordGeneratorFactory.forConstantValue(value),
TitanKafkaSenderFactory.forKafkaConfig(
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl)))
.withThreads(threads);
}
......
......@@ -5,30 +5,24 @@ package theodolite.commons.workloadgeneration;
*/
public class LoadGeneratorConfig {
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private BeforeAction beforeAction = BeforeAction.doNothing();
private int threads = 1;
public LoadGeneratorConfig(final MessageGenerator messageGenerator) {
this.messageGenerator = messageGenerator;
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
this.messageGenerator = GeneratorAction.from(generator, sender);
}
public LoadGeneratorConfig(
final MessageGenerator messageGenerator,
public <T> LoadGeneratorConfig(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender,
final int threads) {
this.messageGenerator = messageGenerator;
this(generator, sender);
this.threads = threads;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public void setThreads(final int threads) {
this.threads = threads;
}
......@@ -37,6 +31,13 @@ public class LoadGeneratorConfig {
this.beforeAction = beforeAction;
}
public BeforeAction getBeforeAction() {
return this.beforeAction;
}
public LoadGeneratorExecution buildLoadGeneratorExecution(
final WorkloadDefinition workloadDefinition) {
return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads);
}
}
......@@ -8,25 +8,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link LoadGeneratorExecution} represents the execution of load generator, i.e., it can be
* A {@link LoadGeneratorExecution} represents the execution of a load generator, i.e., it can be
* started and stopped.
*/
public class LoadGeneratorExecution {
class LoadGeneratorExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGeneratorExecution.class);
private final Random random = new Random();
private final WorkloadDefinition workloadDefinition;
private final MessageGenerator messageGenerator;
private final GeneratorAction messageGenerator;
private final ScheduledExecutorService executor;
/**
* Create a new {@link LoadGeneratorExecution} for a given {@link WorkloadDefinition} and a
* {@link MessageGenerator}. Load is generated by the given number of threads.
* {@link GeneratorAction}. Load is generated by the given number of threads.
*/
public LoadGeneratorExecution(
final WorkloadDefinition workloadDefinition,
final MessageGenerator messageGenerator,
final GeneratorAction messageGenerator,
final int threads) {
this.workloadDefinition = workloadDefinition;
this.messageGenerator = messageGenerator;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment