diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 48d3681dab13b31ade355d9a1f13704cdc2e9c2e..daa2cd332cbedd388114f316492f6b4eaa93d307 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -71,7 +71,7 @@ test-docs-links: extends: .docs needs: - build-docs - script: bundle exec htmlproofer --assume-extension --allow_hash_href --url-ignore "/favicon.ico" ./_site + script: bundle exec htmlproofer --assume-extension --allow_hash_href ./_site test-docs-crds-regression: stage: test @@ -374,8 +374,10 @@ build-theodolite-jvm: script: ./gradlew --build-cache assemble artifacts: paths: - - "theodolite/build/lib/*" - - "theodolite/build/*-runner.jar" + - "theodolite/build/quarkus-app/lib/" + - "theodolite/build/quarkus-app/*.jar" + - "theodolite/build/quarkus-app/app/" + - "theodolite/build/quarkus-app/quarkus/" expire_in: 6 hours build-theodolite-native: diff --git a/docs/creating-a-benchmark.md b/docs/creating-a-benchmark.md index 122f43b645c9702d16722a3061bfde8bec7c94c6..2b1d93bbec0afb3a8897a77439388538fc75c4a6 100644 --- a/docs/creating-a-benchmark.md +++ b/docs/creating-a-benchmark.md @@ -108,6 +108,9 @@ Suppose the resources needed by your benchmark are defined as YAML files, locate Benchmarks need to specify at least one supported load and resource type for which scalability can be benchmarked. Load and resource types are described by a name (used for reference from an Execution) and a list of patchers. +Patchers can be seen as functions, which take a value as input and modify a Kubernetes resource in a patcher-specific way. Examples of patchers are the *ReplicaPatcher*, which modifies the replica specification of a deployment, or the *EnvVarPatcher*, which modifies an environment variable. +See the [patcher API reference](api-reference/patchers) for an overview of available patchers. + If a benchmark is [executed by an Execution](running-benchmarks), these patchers are used to configure SUT and load generator according to the [load and resource values](creating-an-execution) set in the Execution. ## Kafka Configuration @@ -116,6 +119,12 @@ Theodolite allows to automatically create and remove Kafka topics for each SLO e Use the `removeOnly: True` property for topics which are created automatically by the SUT. For those topics, also wildcards are allowed in the topic name. +If no Kafka topics should be created, simply set: + +```yaml +kafkaConfig: [] +``` + <!-- Further information: API Reference --> <!-- Further information: How to deploy --> diff --git a/docs/favicon.ico b/docs/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..81062e21501bd98a29505433c1e3b43965f5c17d Binary files /dev/null and b/docs/favicon.ico differ diff --git a/docs/running-benchmarks.md b/docs/running-benchmarks.md index 7da1c7e5f8385a2818ae587b4c3ab3715a6c2bb2..0a76316c0515233f9445b363f941d60ab7aa0e06 100644 --- a/docs/running-benchmarks.md +++ b/docs/running-benchmarks.md @@ -143,7 +143,7 @@ The easiest way to use them is at MyBinder: Alternatively, you can also [run these notebook locally](https://github.com/cau-se/theodolite/tree/master/analysis), for example, with Docker or Visual Studio Code. -The notebooks allow to compute a scalability function using its *demand* metric and to visualize multiple such functions in plots: +The notebooks allow to compute a scalability function using Theodolite's *demand* metric and to visualize multiple such functions in plots: ### Computing the *demand* metric with `demand-metric.ipynb` (optional) diff --git a/docs/theodolite-benchmarks/index.md b/docs/theodolite-benchmarks/index.md index 9b08e6f5f2fe049c17dce819b7c4d9b83fcbc12e..30b8e816ef1b48e770c8e42be1d599a71431c976 100644 --- a/docs/theodolite-benchmarks/index.md +++ b/docs/theodolite-benchmarks/index.md @@ -1,12 +1,12 @@ --- title: Available Benchmarks -has_children: false +has_children: true nav_order: 7 --- # Theodolite Benchmarks -Theodolite comes with 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding load generator is provided. Currently, Theodolite provides benchmark implementations for Apache Kafka Streams and Apache Flink. +Theodolite comes with 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding [load generator](load-generator) is provided. Currently, Theodolite provides benchmark implementations for Apache Kafka Streams and Apache Flink. Theodolite's benchmarks are based on typical use cases for stream processing within microservices. Specifically, all benchmarks represent some sort of microservice doing Industrial Internet of Things data analytics. diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md new file mode 100644 index 0000000000000000000000000000000000000000..6d42ea06d9cb008a9aeddcc8145a2868c8d916b1 --- /dev/null +++ b/docs/theodolite-benchmarks/load-generator.md @@ -0,0 +1,87 @@ +--- +title: Load Generators +parent: Available Benchmarks +has_children: false +nav_order: 1 +--- + +# Load Generator Framework + +Theodolite's benchmarks come with a flexible load generator framework. It is used to create load on the [4 Theodolite benchmarks](#prebuilt-container-images), but can also be applied to create [custom load generators](#creating-a-custom-load-generator). +It is particularly designed for scalability: Just spin up multiple instances of the load generator and the instances automatically divide the load to be generated among themselves. + +## Prebuilt container images + +For each benchmark, we provide a [load generator as OCI container image](https://github.com/orgs/cau-se/packages?tab=packages&q=workload-generator). These load generators simulate smart power meters in an industrial facility, which generate measurement records at a fixed rate. Records are published to an Apache Kafka topic (default) or sent as POST requests to an HTTP endpoint. + +You can simply run a load generator container, for example, for benchmark UC1 with: + +```sh +docker run ghcr.io/cau-se/theodolite-uc1-workload-generator +``` + +### Message format + +Messages generated by the load generators represent a single measurement of [active power](https://en.wikipedia.org/wiki/AC_power#Active,_reactive,_apparent,_and_complex_power_in_sinusoidal_steady-state). The corresponding message type is specified as [`ActivePowerRecords`](https://github.com/cau-se/titan-ccp-common/blob/master/src/main/avro/ActivePower.avdl) +defined with Avro. It consists of an identifier for simulated power sensor, a timestamp in epoch milliseconds and the actual measured (simulated) value in watts. + +When sending generated records via Apache Kafka, these records are serialized with the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry). +If the load generator is configured to send records as HTTP POST requests, records are serialized as JSON according to the following format: + +```json +{ + "identifier": "sensor-id", + "timestamp": 1645564942000, + "valueInW": 1234.56 +} +``` + +### Configuration + +The prebuilt container images can be configured with the following environment variables: + +| Environment Variable | Description | Default | +|:----|:----|:----| +| `BOOTSTRAP_SERVER` | Address (`hostname:port`) of another load generator instance to form a cluster with. Can also be this instance. | `localhost:5701` | +| `KUBERNETES_DNS_NAME` | Kubernetes service name to discover other load generators to form a cluster with. Must be a fully qualified domain name (FQDN), e.g., something like `<service>.<namespace>.svc.cluster.local`. * Requires `BOOTSTRAP_SERVER` not to be set. | | +| `PORT` | Port used for for coordination among load generator instances. | 5701 | +| `PORT_AUTO_INCREMENT` | If set to true and the specified PORT is already used, use the next higher one. Useful if multiple instances should run on the same host, without configuring each instance individually. | true | +| `CLUSTER_NAME_PREFIX` | Only required if unrelated load generators form a cluster. | theodolite-load-generation | +| `TARGET` | The target system the load generator send messages to. Valid values are: `kafka`, `http`. | `kafka` | +| `KAFKA_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. See [Kafka producer config: `bootstrap.servers`](https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers) for more information. Only used if Kafka is set as `TARGET`. | `localhost:9092` | +| `KAFKA_INPUT_TOPIC` | Name of the Kafka topic, which should receive the generated messages. Only used if Kafka is set as `TARGET`. | input | +| `SCHEMA_REGISTRY_URL` | URL of the [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry). | `http://localhost:8081` | +| `KAFKA_BATCH_SIZE` | Value for the Kafka producer configuration: [`batch.size`](https://kafka.apache.org/documentation/#producerconfigs_batch.size). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`batch.size`](https://kafka.apache.org/documentation/#producerconfigs_batch.size) | +| `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | +| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | +| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | +| `NUM_SENSORS` | The amount of simulated sensors. | 10 | +| `PERIOD_MS` | The time in milliseconds between generating two messages for the same sensor. With our Theodolite benchmarks, we apply an [open workload model](https://www.usenix.org/legacy/event/nsdi06/tech/full_papers/schroeder/schroeder.pdf) in which new messages are generated at a fixed rate, without considering the think time of the target server nor the time required for generating a message. | 1000 | +| `VALUE` | The constant `valueInW` of an `ActivePowerRecord`. | 10 | +| `THREADS` | Number of worker threads used to generate the load. | 4 | + +Please note that there are some additional configuration options for benchmark [UC4's load generator](https://github.com/cau-se/theodolite/blob/master/theodolite-benchmarks/uc4-load-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java). + +## Creating a custom load generator + +To create a custom load generator, you need to import the [load-generator-commons](https://github.com/cau-se/theodolite/tree/master/theodolite-benchmarks/load-generator-commons) project. You can then create an instance of the `LoadGenerator` object and call its `run` method: + +```java +LoadGenerator loadGenerator = new LoadGenerator() + .setClusterConfig(clusterConfig) + .setLoadDefinition(new WorkloadDefinition( + new KeySpace(key_prefix, numSensors), + duration)) + .setGeneratorConfig(new LoadGeneratorConfig( + recordGenerator, + recordSender)) + .withThreads(threads); +loadGenerator.run(); +``` + +Alternatively, you can also start with a load generator populated with a default configuration or created from environment variables and then adjust the `LoadGenerator` as desired: + +```java +LoadGenerator loadGeneratorFromDefaults = LoadGenerator.fromDefaults() +LoadGenerator loadGeneratorFromEnv = LoadGenerator.fromEnvironment(); +``` diff --git a/theodolite-benchmarks/docker-test/uc1-beam-flink/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-beam-flink/docker-compose.yml index d8a7b946a9d5e407032ce02838b3ad02892eae73..1b683b4ca65a2582aa6a4d68444c4bbef7895b73 100644 --- a/theodolite-benchmarks/docker-test/uc1-beam-flink/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-beam-flink/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc1-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc1-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc1-beam-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: > @@ -62,7 +62,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc1-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc1-beam-flink:${THEODOLITE_TAG:-latest} scale: 1 command: taskmanager environment: diff --git a/theodolite-benchmarks/docker-test/uc1-beam-samza/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-beam-samza/docker-compose.yml index 11cf0c345b417fdda7cedba2f9db1342d2b64634..f5213799daa2d51eea53e794becdffc151a4da56 100644 --- a/theodolite-benchmarks/docker-test/uc1-beam-samza/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-beam-samza/docker-compose.yml @@ -21,7 +21,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -35,7 +35,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc1-beam-samza:latest + image: ghcr.io/cau-se/theodolite-uc1-beam-samza:${THEODOLITE_TAG:-latest} scale: 1 depends_on: - schema-registry @@ -47,7 +47,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc1-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml index 5a252f07e23205cf20390230ec956240ad2dc7a6..6c661bb49cb4173357acc89c06783fe5e0a2ce49 100755 --- a/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-flink-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc1-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc1-flink:latest + image: ghcr.io/cau-se/theodolite-uc1-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: standalone-job --job-classname theodolite.uc1.application.HistoryServiceFlinkJob @@ -59,7 +59,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc1-flink:latest + image: ghcr.io/cau-se/theodolite-uc1-flink:${THEODOLITE_TAG:-latest} command: taskmanager environment: - | diff --git a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml index 88ffadfcf3ce7e372fad1e3cbf28cc3aa847756d..25c19e35e5dae1807ef46fb8ade4e888dff0c2d8 100755 --- a/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc1-kstreams-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:latest + image: ghcr.io/cau-se/theodolite-uc1-kstreams-app:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -41,7 +41,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc1-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc2-beam-flink/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-beam-flink/docker-compose.yml index f8bdfae935a55c8cb60e3fb22b19c471832ca9f4..8427161e43faa920e011973d76f32f9cc9d62f8c 100644 --- a/theodolite-benchmarks/docker-test/uc2-beam-flink/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-beam-flink/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc2-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc2-beam-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: > @@ -62,7 +62,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc2-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc2-beam-flink:${THEODOLITE_TAG:-latest} scale: 1 command: taskmanager environment: diff --git a/theodolite-benchmarks/docker-test/uc2-beam-samza/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-beam-samza/docker-compose.yml index 67a5997b66833e33696592285dffe24b03b3d210..820bb25dd370cf6c7410b20fbdbdb1d4281f47d3 100644 --- a/theodolite-benchmarks/docker-test/uc2-beam-samza/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-beam-samza/docker-compose.yml @@ -21,7 +21,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -35,7 +35,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc2-beam-samza:latest + image: ghcr.io/cau-se/theodolite-uc2-beam-samza:${THEODOLITE_TAG:-latest} scale: 1 depends_on: - schema-registry @@ -47,7 +47,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml index f7047a7af7d0b613dd128b9d8d4d9fffd22b4692..e51c544bfd64c6f540f5a84a5f617de745a1a6b6 100755 --- a/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-flink-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc2-flink:latest + image: ghcr.io/cau-se/theodolite-uc2-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: standalone-job --job-classname theodolite.uc2.application.HistoryServiceFlinkJob @@ -59,7 +59,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc2-flink:latest + image: ghcr.io/cau-se/theodolite-uc2-flink:${THEODOLITE_TAG:-latest} command: taskmanager environment: - | diff --git a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml index 89f2633b390b08a3a18128e98f261cc264e2b41d..4cc4978d844a8b361af4061ab41d1adc4e7a1813 100755 --- a/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-kstreams-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc2-kstreams-app:latest + image: ghcr.io/cau-se/theodolite-uc2-kstreams-app:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -42,7 +42,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 KAFKA_WINDOW_DURATION_MINUTES: 60 load-generator: - image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc3-beam-flink/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-beam-flink/docker-compose.yml index 9a18ab364463a985b40cd691f6232b9b47ae412e..b9e934b61f9742ad4e83601ff13707be0641c875 100644 --- a/theodolite-benchmarks/docker-test/uc3-beam-flink/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-beam-flink/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc3-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc3-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc3-beam-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: > @@ -64,7 +64,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc3-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc3-beam-flink:${THEODOLITE_TAG:-latest} scale: 1 command: taskmanager environment: diff --git a/theodolite-benchmarks/docker-test/uc3-beam-samza/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-beam-samza/docker-compose.yml index a50b32bd8f78678d63f06688821d6dfb5f133138..3ae3a2faa7c0849b76ea838b09ba0254bdfea936 100644 --- a/theodolite-benchmarks/docker-test/uc3-beam-samza/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-beam-samza/docker-compose.yml @@ -21,7 +21,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -35,7 +35,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc3-beam-samza:latest + image: ghcr.io/cau-se/theodolite-uc3-beam-samza:${THEODOLITE_TAG:-latest} scale: 1 depends_on: - schema-registry @@ -47,7 +47,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc3-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml index c2b8d7ad436301138acdf8dfae1654e2feb9b9bb..42c55950fbea022f00019699ae72678fbe88b8cb 100755 --- a/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-flink-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc3-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -44,7 +44,7 @@ services: SCHEMA_REGISTRY_URL: http://schema-registry:8081 NUM_SENSORS: 10 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc3-flink:latest + image: ghcr.io/cau-se/theodolite-uc3-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: standalone-job --job-classname theodolite.uc3.application.HistoryServiceFlinkJob @@ -59,7 +59,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc3-flink:latest + image: ghcr.io/cau-se/theodolite-uc3-flink:${THEODOLITE_TAG:-latest} command: taskmanager environment: - | diff --git a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml index 65b0a3467e123a84d0e719d8702749ed33773aea..f943de372b75bf64df0028679d56950fedfaec48 100755 --- a/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc3-kstreams-docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1" + KAFKA_CREATE_TOPICS: "input:3:1,output:3:1" schema-registry: image: confluentinc/cp-schema-registry:5.3.1 depends_on: @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest + image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -41,7 +41,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc3-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml index 5169ac551952f992d98c74f7d65d5378ecdcc2a5..b0d18dfe34c23847b3cc0b04d788fdc4b3552e68 100644 --- a/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -45,7 +45,7 @@ services: NUM_SENSORS: 4 NUM_NESTED_GROUPS: 4 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc4-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc4-beam-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: > @@ -66,7 +66,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc4-beam-flink:latest + image: ghcr.io/cau-se/theodolite-uc4-beam-flink:${THEODOLITE_TAG:-latest} scale: 1 command: taskmanager environment: diff --git a/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml index bded9d5d227d0f62cb6cb3f9edac3df383ea3e8a..b9e4142cb0be2701f7772cff5725b524aaffcbc7 100644 --- a/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml @@ -35,7 +35,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc4-beam-samza:latest + image: ghcr.io/cau-se/theodolite-uc4-beam-samza:${THEODOLITE_TAG:-latest} scale: 1 depends_on: - schema-registry @@ -47,7 +47,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml index 0f7e4e656dede1aad3342fb79816e3ebf88e84d8..5a5a5924ef13cc9691b267ce6169bdb85a1e267b 100755 --- a/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-flink-docker-compose/docker-compose.yml @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' load-generator: - image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -45,7 +45,7 @@ services: NUM_SENSORS: 4 NUM_NESTED_GROUPS: 4 benchmark-jobmanager: - image: ghcr.io/cau-se/theodolite-uc4-flink:latest + image: ghcr.io/cau-se/theodolite-uc4-flink:${THEODOLITE_TAG:-latest} #ports: # - "8080:8081" command: standalone-job --job-classname theodolite.uc4.application.AggregationServiceFlinkJob @@ -60,7 +60,7 @@ services: - schema-registry - kafka benchmark-taskmanager: - image: ghcr.io/cau-se/theodolite-uc4-flink:latest + image: ghcr.io/cau-se/theodolite-uc4-flink:${THEODOLITE_TAG:-latest} command: taskmanager environment: - | diff --git a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml index 5fca44708006d1fae3ae2f9f46b5c42f6431fc3a..1818505787f662d57f1a96c8c79b03d61cf9da2a 100755 --- a/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-kstreams-docker-compose/docker-compose.yml @@ -33,7 +33,7 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' benchmark: - image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest + image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka @@ -41,7 +41,7 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 load-generator: - image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest + image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/load-generator-commons/build.gradle b/theodolite-benchmarks/load-generator-commons/build.gradle index f2aa10b079f4be80d19d9ac5d822b7bdab0b6d78..2d8f77b5154b5b788e0729da69122b443740ce75 100644 --- a/theodolite-benchmarks/load-generator-commons/build.gradle +++ b/theodolite-benchmarks/load-generator-commons/build.gradle @@ -13,14 +13,16 @@ repositories { } dependencies { - implementation 'com.google.guava:guava:30.1-jre' implementation 'com.hazelcast:hazelcast:4.1.1' implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1' implementation 'org.slf4j:slf4j-simple:1.7.25' + implementation 'com.google.guava:guava:30.1-jre' + implementation 'com.google.code.gson:gson:2.8.2' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required? // Use JUnit test framework testImplementation 'junit:junit:4.12' + testImplementation 'com.github.tomakehurst:wiremock-jre8:2.32.0' } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index 45ac1d5bb9c21a1b6303de2f248d08b69c02fc28..7a60e271f04e396b2e0c69b1fcfee1d8a1ca8a7d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -23,6 +23,8 @@ public final class ConfigurationKeys { public static final String THREADS = "THREADS"; + public static final String TARGET = "TARGET"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; @@ -35,6 +37,8 @@ public final class ConfigurationKeys { public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; + public static final String HTTP_URL = "HTTP_URL"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..6b7a5db067c8117f046aa0ff1c6f5d56c35c4321 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/HttpRecordSender.java @@ -0,0 +1,93 @@ +package theodolite.commons.workloadgeneration; + +import com.google.gson.Gson; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.avro.specific.SpecificRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends monitoring records via HTTP. + * + * @param <T> {@link SpecificRecord} to send + */ +public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<T> { + + private static final int HTTP_OK = 200; + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); + + private final Gson gson = new Gson(); + + private final HttpClient httpClient = HttpClient.newBuilder().build(); + + private final URI uri; + + private final boolean async; + + private final List<Integer> validStatusCodes; + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + */ + public HttpRecordSender(final URI uri) { + this(uri, true, List.of(HTTP_OK)); + } + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + * @param async whether HTTP requests should be sent asynchronous + * @param validStatusCodes a list of HTTP status codes which are considered as successful + */ + public HttpRecordSender(final URI uri, final boolean async, + final List<Integer> validStatusCodes) { + this.uri = uri; + this.async = async; + this.validStatusCodes = validStatusCodes; + } + + @Override + public void send(final T message) { + final String json = this.gson.toJson(message); + final HttpRequest request = HttpRequest.newBuilder() + .uri(this.uri) + .POST(HttpRequest.BodyPublishers.ofString(json)) + .build(); + final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); + // final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); + + final CompletableFuture<HttpResponse<Void>> result = + this.httpClient.sendAsync(request, bodyHandler) + .whenComplete((response, exception) -> { + if (exception != null) { // NOPMD + LOGGER.warn("Couldn't send request to {}.", this.uri, exception); // NOPMD false-p. + } else if (!this.validStatusCodes.contains(response.statusCode())) { // NOPMD + LOGGER.warn("Received status code {} for request to {}.", response.statusCode(), + this.uri); + } else { + LOGGER.debug("Sucessfully sent request to {} (status={}).", this.uri, + response.statusCode()); + } + }); + if (this.async) { + try { + result.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Couldn't get result for request to {}.", this.uri, e); + } + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java index ded7c347c8d6b057581dc63b691df5bb60997791..44ff8a92afd5356b4bb2af203899a61f7af48b2d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/KafkaRecordSender.java @@ -15,7 +15,7 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** * Sends monitoring records to Kafka. * - * @param <T> {@link IMonitoringRecord} to send + * @param <T> {@link SpecificRecord} to send */ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java index 3f5d14c2e7dccb94e4aacde1f531ec2e9d1fb8db..6453ef0bd3b6d5a3b5f7f2b77fa20da8f79cb35f 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGenerator.java @@ -1,11 +1,13 @@ package theodolite.commons.workloadgeneration; +import java.net.URI; import java.time.Duration; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import titan.ccp.model.records.ActivePowerRecord; /** * A Theodolite load generator. @@ -20,9 +22,11 @@ public final class LoadGenerator { private static final int PERIOD_MS_DEFAULT = 1000; private static final int VALUE_DEFAULT = 10; private static final int THREADS_DEFAULT = 4; + private static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; private static final String KAFKA_TOPIC_DEFAULT = "input"; private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD + private static final String HTTP_URI_DEFAULT = "http://localhost:8080"; private ClusterConfig clusterConfig; private WorkloadDefinition loadDefinition; @@ -91,7 +95,7 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT), + TitanRecordGenerator.forConstantValue(VALUE_DEFAULT), TitanKafkaSenderFactory.forKafkaConfig( KAFKA_BOOTSTRAP_SERVERS_DEFAULT, KAFKA_TOPIC_DEFAULT, @@ -134,6 +138,47 @@ public final class LoadGenerator { clusterConfig.setClusterNamePrefix(portAutoIncrement); } + final LoadGeneratorTarget target = LoadGeneratorTarget.from( + Objects.requireNonNullElse(System.getenv(ConfigurationKeys.TARGET), + TARGET_DEFAULT.getValue())); + + final RecordSender<ActivePowerRecord> recordSender; // NOPMD + if (target == LoadGeneratorTarget.KAFKA) { + final String kafkaBootstrapServers = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), + KAFKA_BOOTSTRAP_SERVERS_DEFAULT); + final String kafkaInputTopic = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), + KAFKA_TOPIC_DEFAULT); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + SCHEMA_REGISTRY_URL_DEFAULT); + final Properties kafkaProperties = new Properties(); + kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); + kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); + kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, + (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); + recordSender = TitanKafkaSenderFactory.forKafkaConfig( + kafkaBootstrapServers, + kafkaInputTopic, + schemaRegistryUrl); + LOGGER.info( + "Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS + kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic); + } else if (target == LoadGeneratorTarget.HTTP) { + final URI url = URI.create( + Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_URL), + HTTP_URI_DEFAULT)); + recordSender = new HttpRecordSender<>(url); + LOGGER.info("Use HTTP server as target with url '{}'.", url); + } else { + // Should never happen + throw new IllegalStateException("Target " + target + " is not handled yet."); + } + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.NUM_SENSORS), Integer.toString(NUMBER_OF_KEYS_DEFAULT))); @@ -146,22 +191,6 @@ public final class LoadGenerator { final int threads = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.THREADS), Integer.toString(THREADS_DEFAULT))); - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BOOTSTRAP_SERVERS_DEFAULT); - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_TOPIC_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaProperties = new Properties(); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BATCH_SIZE)); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_LINGER_MS)); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, - (k, v) -> System.getenv(ConfigurationKeys.KAFKA_BUFFER_MEMORY)); return new LoadGenerator() .setClusterConfig(clusterConfig) @@ -169,11 +198,8 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGeneratorFactory.forConstantValue(value), - TitanKafkaSenderFactory.forKafkaConfig( - kafkaBootstrapServers, - kafkaInputTopic, - schemaRegistryUrl))) + TitanRecordGenerator.forConstantValue(value), + recordSender)) .withThreads(threads); } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java new file mode 100644 index 0000000000000000000000000000000000000000..086e4de36301693c6873016122a47709b858a0d4 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/LoadGeneratorTarget.java @@ -0,0 +1,26 @@ +package theodolite.commons.workloadgeneration; + +import java.util.stream.Stream; + +enum LoadGeneratorTarget { + + KAFKA("kafka"), HTTP("http"); + + private final String value; + + LoadGeneratorTarget(final String value) { + this.value = value; + } + + String getValue() { + return this.value; + } + + static LoadGeneratorTarget from(final String value) { + return Stream.of(LoadGeneratorTarget.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Target '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..cebdacaee9a8e7d05787fdf3f846d49914574828 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGenerator.java @@ -0,0 +1,38 @@ +package theodolite.commons.workloadgeneration; + +import java.time.Clock; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s. + */ +public final class TitanRecordGenerator implements RecordGenerator<ActivePowerRecord> { + + private final Clock clock; + + private final double constantValue; + + private TitanRecordGenerator(final double constantValue) { + this.constantValue = constantValue; + this.clock = Clock.systemUTC(); + } + + /* default */ TitanRecordGenerator(final double constantValue, final Clock clock) { + this.constantValue = constantValue; + this.clock = clock; + } + + /** + * Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a + * constant value. + */ + public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) { + return new TitanRecordGenerator(value); + } + + @Override + public ActivePowerRecord generate(final String key) { + return new ActivePowerRecord(key, this.clock.millis(), this.constantValue); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java deleted file mode 100644 index 4e1c10071eff28d77514dbc121e30bead3f6fa74..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorFactory.java +++ /dev/null @@ -1,21 +0,0 @@ -package theodolite.commons.workloadgeneration; - -import titan.ccp.model.records.ActivePowerRecord; - -/** - * A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s. - */ -public final class TitanRecordGeneratorFactory { - - - private TitanRecordGeneratorFactory() {} - - /** - * Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a - * constant value. - */ - public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) { - return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value); - } - -} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6d908d34b7c6b87254782b6ae8b0b8dc2a6d036e --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/HttpRecordSenderTest.java @@ -0,0 +1,53 @@ +package theodolite.commons.workloadgeneration; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.exactly; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.google.gson.Gson; +import java.net.URI; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import titan.ccp.model.records.ActivePowerRecord; + +public class HttpRecordSenderTest { + + private HttpRecordSender<ActivePowerRecord> httpRecordSender; + + private Gson gson; + + @Rule + public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort()); + + @Before + public void setup() { + this.httpRecordSender = + new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port())); + this.gson = new Gson(); + } + + @Test + public void testValidUri() { + this.wireMockRule.stubFor( + post(urlPathEqualTo("/")) + .willReturn( + aResponse() + .withStatus(200) + .withBody("received"))); + + final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34); + this.httpRecordSender.send(record); + + final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}"; + verify(exactly(1), postRequestedFor(urlEqualTo("/")) + .withRequestBody(equalTo(expectedJson))); // toJson + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java index 20c094ddcc7ff110a25aaffa494766e89d4d2475..49004839a9c8fd280aba5006a1f08c2acb3c3136 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/KeySpaceTest.java @@ -2,7 +2,6 @@ package theodolite.commons.workloadgeneration; import org.junit.Assert; import org.junit.Test; -import theodolite.commons.workloadgeneration.KeySpace; public class KeySpaceTest { diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java new file mode 100644 index 0000000000000000000000000000000000000000..644ffad9a4d2732f72ac307294d1311eba3a9ce8 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/LoadGeneratorTargetTest.java @@ -0,0 +1,26 @@ +package theodolite.commons.workloadgeneration; + +import org.junit.Assert; +import org.junit.Test; + +public class LoadGeneratorTargetTest { + + @Test + public void testFromKafka() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("kafka"); + Assert.assertEquals(LoadGeneratorTarget.KAFKA, target); + } + + @Test + public void testFromHttp() { + final LoadGeneratorTarget target = LoadGeneratorTarget.from("http"); + Assert.assertEquals(LoadGeneratorTarget.HTTP, target); + } + + @Test(expected = IllegalArgumentException.class) + public void testFromInvalidTarget() { + LoadGeneratorTarget.from("<invalid-target>"); + } + + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..04ba38b9c8fcd41df46d3d3070a6308acfd72cb7 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/theodolite/commons/workloadgeneration/TitanRecordGeneratorTest.java @@ -0,0 +1,40 @@ +package theodolite.commons.workloadgeneration; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import org.junit.Assert; +import org.junit.Test; +import titan.ccp.model.records.ActivePowerRecord; + +public class TitanRecordGeneratorTest { + + @Test + public void testGenerate() { + final ZoneId zoneId = ZoneOffset.UTC; + final LocalDateTime dateTime = LocalDateTime.of(2022, 1, 17, 14, 2, 42); + final Instant instant = dateTime.atZone(zoneId).toInstant(); + final TitanRecordGenerator generator = + new TitanRecordGenerator(42.0, Clock.fixed(instant, zoneId)); + + final ActivePowerRecord activePowerRecord = generator.generate("my-identifier"); + Assert.assertEquals("my-identifier", activePowerRecord.getIdentifier()); + Assert.assertEquals(instant.toEpochMilli(), activePowerRecord.getTimestamp()); + Assert.assertEquals(42.0, activePowerRecord.getValueInW(), 0.001); + } + + @Test + public void testTimestampForArbitraryClockTimeZone() { + final LocalDateTime dateTime = LocalDateTime.of(2022, 1, 17, 14, 2, 42); + final Instant instant = dateTime.atZone(ZoneId.of("Europe/Paris")).toInstant(); + // Setting of ZoneId should have no impact on result as we request epoch millis + final Clock clock = Clock.fixed(instant, ZoneId.of("America/Sao_Paulo")); + final TitanRecordGenerator generator = new TitanRecordGenerator(42.0, clock); + + final ActivePowerRecord activePowerRecord = generator.generate("my-identifier"); + Assert.assertEquals(instant.toEpochMilli(), activePowerRecord.getTimestamp()); + } + +} diff --git a/theodolite/build.gradle b/theodolite/build.gradle index 06d451cc24395824650e88d2fe516eb4015a266e..0ec113cbba893bc1f2f44a60c270f7cb67688803 100644 --- a/theodolite/build.gradle +++ b/theodolite/build.gradle @@ -26,19 +26,13 @@ dependencies { implementation 'com.google.code.gson:gson:2.8.9' implementation 'org.slf4j:slf4j-simple:1.7.32' implementation 'io.github.microutils:kotlin-logging:2.1.16' - //implementation('io.fabric8:kubernetes-client:5.4.1'){force = true} - //implementation('io.fabric8:kubernetes-model-core:5.4.1'){force = true} - //implementation('io.fabric8:kubernetes-model-common:5.4.1'){force = true} implementation 'org.apache.kafka:kafka-clients:2.8.0' implementation 'khttp:khttp:1.0.0' - // compile 'junit:junit:4.12' - testImplementation 'io.quarkus:quarkus-junit5' testImplementation 'io.quarkus:quarkus-test-kubernetes-client' testImplementation 'io.rest-assured:rest-assured' testImplementation 'org.junit-pioneer:junit-pioneer:1.5.0' - //testImplementation 'io.fabric8:kubernetes-server-mock:5.10.1' testImplementation "org.mockito.kotlin:mockito-kotlin:4.0.0" }