diff --git a/CITATION.cff b/CITATION.cff index 0ce67d554c2e47b5875e4aefe89b1d315469b2ec..ef59a12fafa54dab086407897ec79dc986292973 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -11,7 +11,7 @@ title: Theodolite version: "0.8.2" repository-code: "https://github.com/cau-se/theodolite" license: "Apache-2.0" -doi: "10.1016/j.bdr.2021.100209" +doi: "10.1007/s10664-022-10162-1" preferred-citation: type: article authors: @@ -21,9 +21,9 @@ preferred-citation: - family-names: Hasselbring given-names: Wilhelm orcid: "https://orcid.org/0000-0001-6625-4335" - doi: "10.1016/j.bdr.2021.100209" - journal: "Big Data Research" - month: 7 - title: "Theodolite: Scalability Benchmarking of Distributed Stream Processing Engines in Microservice Architectures" - volume: 25 - year: 2021 + doi: "10.1007/s10664-022-10162-1" + journal: "Empirical Software Engineering" + month: 8 + title: "A Configurable Method for Benchmarking Scalability of Cloud-Native Applications" + volume: 27 + year: 2022 diff --git a/README.md b/README.md index a93be9469000b3771d39f72a8c920dce28d1c919..91a3205745794c96d30cfe17d4938597069f2c6a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ > A theodolite is a precision optical instrument for measuring angles between designated visible points in the horizontal and vertical planes. -- <cite>[Wikipedia](https://en.wikipedia.org/wiki/Theodolite)</cite> -Theodolite is a framework for benchmarking the horizontal and vertical scalability of cloud-native applications. +Theodolite is a framework for benchmarking the horizontal and vertical scalability of cloud-native applications in Kubernetes. ## Quickstart @@ -23,8 +23,21 @@ Documentation on Theodolite itself as well as regarding its benchmarking method * The source code of Theodolite's SLO checkers are located in [`slo-checker`](slo-checker). * The documentation, which is hosted on [theodolite.rocks](https://www.theodolite.rocks), is located in [`docs`](docs). +## Contributing + +We are happy to accept any kind of contributions to Theodolite. +This includes reporting any issues you find using Theodolite, bug fixes and improvements as well as integrating your research within the project. + +See our website to [start contributing](https://www.theodolite.rocks/development/). + ## How to Cite If you use Theodolite, please cite -> Sören Henning and Wilhelm Hasselbring. (2021). Theodolite: Scalability Benchmarking of Distributed Stream Processing Engines in Microservice Architectures. Big Data Research, Volume 25. DOI: [10.1016/j.bdr.2021.100209](https://doi.org/10.1016/j.bdr.2021.100209). arXiv:[2009.00304](https://arxiv.org/abs/2009.00304). +> Sören Henning and Wilhelm Hasselbring. “A Configurable Method for Benchmarking Scalability of Cloud-Native Applications”. In: *Empirical Software Engineering* 27. 2022. DOI: [10.1007/s10664-022-10162-1](https://doi.org/10.1007/s10664-022-10162-1). + +When referring to our stream processing benchmarks, please cite + +> Sören Henning and Wilhelm Hasselbring. “Theodolite: Scalability Benchmarking of Distributed Stream Processing Engines in Microservice Architectures”. In: *Big Data Research* 25. 2021. DOI: [10.1016/j.bdr.2021.100209](https://doi.org/10.1016/j.bdr.2021.100209). arXiv:[2009.00304](https://arxiv.org/abs/2009.00304). + +See our website for a [list of publications](https://www.theodolite.rocks/publications.html) directly related to Theodolite. \ No newline at end of file diff --git a/buildimages/README.md b/buildimages/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f158050bf2ae7e42dc1292a29086054d89384f3a --- /dev/null +++ b/buildimages/README.md @@ -0,0 +1,3 @@ +# Theodolite Build Images + +This directory contains some Dockerfiles for images required for Theodolite build infrastructure. diff --git a/codemeta.json b/codemeta.json index caf403217071cb4375fdbf039f51efd7152dd8ca..4767108624bfa905c1be5b3af3dc2a2f680325c5 100644 --- a/codemeta.json +++ b/codemeta.json @@ -14,7 +14,7 @@ "relatedLink": [ "https://www.theodolite.rocks" ], - "referencePublication": "https://doi.org/10.1016/j.bdr.2021.100209", + "referencePublication": "https://doi.org/10.1007/s10664-022-10162-1", "programmingLanguage": [ "Kotlin", "Java", diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index 43f0c8583ce3afbaa42cc58eceea19634cad7917..f5bd189a43be738778d162b05f9c47a96dad6745 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -239,7 +239,7 @@ GEM jekyll-seo-tag (~> 2.1) minitest (5.15.0) multipart-post (2.1.1) - nokogiri (1.13.9-x86_64-linux) + nokogiri (1.13.10-x86_64-linux) racc (~> 1.4) octokit (4.22.0) faraday (>= 0.9) @@ -248,7 +248,7 @@ GEM pathutil (0.16.2) forwardable-extended (~> 2.6) public_suffix (4.0.7) - racc (1.6.0) + racc (1.6.1) rainbow (3.1.1) rb-fsevent (0.11.1) rb-inotify (0.10.1) diff --git a/docs/development/index.md b/docs/development/index.md index a427c98d6cff655e74bb7a42a92c9cb4e46b3404..155b3eb8b7aa6a18b8953132edf75d35abfb75d4 100644 --- a/docs/development/index.md +++ b/docs/development/index.md @@ -1,5 +1,29 @@ --- -title: Development +title: Contributing has_children: true nav_order: 10 ---- \ No newline at end of file +--- + +# Contributing + +Theodolite is open-source research software. We welcome everyone to contribute to this project. +Contributions are not limited to code contributions, instead we welcome and recognize everything concerning: + +* Raising issues, questions and suggestions for using Theodolite +* Fixing bugs or implementing new features +* Improving the documentation +* Using Theodolite as part of your (not necessarily scientific) research +* Reporting on your scalability evaluations with Theodolite + +## Start Contributing + +If you have bug reports, feature requests, questions or suggestions, you may create a [GitHub issue](https://github.com/cau-se/theodolite/issues) or directly [contact Theodolite's maintainers](../project-info). +You can also create a [GitHub pull request](https://github.com/cau-se/theodolite/pulls) if you have already implemented bug fixes and improvements. + +If you would like to get more involved in Theodolite's project development and maintenance, you may contact us as well so we can set you up an account for [our internal GitLab](../project-info#project-management). + +## Internal Project Structure + +Theodolite is organized as a monorepo containing multiple largely independent modules in subdirectories. +See the project's [`README.md`](https://github.com/cau-se/theodolite/blob/main/README.md#project-structure) for an overview of all modules. +Each module directory provides a dedicated `README.md` file describing how to build, test, package,... the corresponding module. diff --git a/docs/development/release-process.md b/docs/development/release-process.md index c7ef78ad6b2a2485fb715afea2dd0a5021e1b7a0..576bee3861679d056d2290b33ae4b3b9ce90209d 100644 --- a/docs/development/release-process.md +++ b/docs/development/release-process.md @@ -1,7 +1,7 @@ --- title: Release Process has_children: false -parent: Development +parent: Contributing nav_order: 1 --- diff --git a/docs/project-info.md b/docs/project-info.md index 3fdb921965ef2771805d015a2b7c6723267ad068..bb02abcd1ae8cdb42deb85d17490fbaf9f02ea8b 100644 --- a/docs/project-info.md +++ b/docs/project-info.md @@ -15,7 +15,20 @@ You might also want to raise an issue on [GitHub](http://github.com/cau-se/theod ## Project Management -Theodolite's internal development including issue boards, merge requests and extensive CI pipelines is tracked in our [internal GitLab](https://git.se.informatik.uni-kiel.de/she/theodolite). We provide a public mirror on GitHub, [cau-se/theodolite](http://github.com/cau-se/theodolite), where we are also happy to welcome issues and pull requests. +Theodolite's internal development including issue boards, merge requests and extensive CI pipelines is tracked in our [internal GitLab](https://git.se.informatik.uni-kiel.de/she/theodolite). +While all internal development is publicly accessible, contributing requires an account to be set up. +To ease contribution, we provide a public mirror on GitHub, [cau-se/theodolite](http://github.com/cau-se/theodolite), where we are also happy to welcome issues and pull requests. +Also releases are published via GitHub. See the following table for an overview: + +| Project management | Public GitHub | Internal GitLab | +|:---|:---|:---| +| Source code | [GitHub](https://github.com/cau-se/theodolite) | [GitLab](https://git.se.informatik.uni-kiel.de/she/theodolite) | +| Issue Tracking | [GitHub Issues](https://github.com/cau-se/theodolite/issues) | [GitLab Issues](https://git.se.informatik.uni-kiel.de/she/theodolite/-/issues) | +| Pull/Merge requests | [GitHub Pull requests](https://github.com/cau-se/theodolite/pulls) | [GitLab Merge requests](https://git.se.informatik.uni-kiel.de/she/theodolite/-/merge_requests) | +| Roadmap | | [GitLab Milestones](https://git.se.informatik.uni-kiel.de/she/theodolite/-/milestones) | +| CI/CD pipelines | | [GitLab CI/CD](https://git.se.informatik.uni-kiel.de/she/theodolite/-/pipelines) | +| Releases | [GitHub Releases](https://github.com/cau-se/theodolite/releases) | [GitLab Releases](https://git.se.informatik.uni-kiel.de/she/theodolite/-/releases) | +| Container images | [GitHub Packages](https://github.com/orgs/cau-se/packages?repo_name=theodolite) | | ## Contributors diff --git a/theodolite-benchmarks/README.md b/theodolite-benchmarks/README.md index d2a69992637cc8621d26653e78a38a6f9a6f55e1..3764537229e752293aa70ffecfc95df770952ec8 100644 --- a/theodolite-benchmarks/README.md +++ b/theodolite-benchmarks/README.md @@ -4,6 +4,16 @@ Theodolite comes with a set of 4 benchmarks for event-driven microservices, whic The benchmarks are based on typical use cases for stream processing and named: UC1, UC2, UC3 and UC4. Additionally, we include a load generator for each benchmark. +## Project organization + +All benchmark implementations are organized in a Gradle multi-project. See the [`settings.gradle`](settings.gradle) file for an overview of subprojects and how they are organized. +We also use Gradle convention plugins, organized in [`buildSrc`](buildSrc), for sharing build configuration among subprojects. + +Additionally, this directory contains: + +* *Theodolite* Benchmark definitions for all benchmarks in [`definitions`](definitions). +* Docker Compose files to assist in local development and to run smoke tests in [`docker-test`](docker-test). + ## Building and packaging the benchmarks All benchmarks can be built with: diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java index a63b5f4939566134a0aeec765fe084ea5bcc41ff..582e1ecc4b41e83e62d390da17ff3a7a2e64be42 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java @@ -10,8 +10,8 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; /** * TimeStampPolicy to use event time based on the timestamp of the record value. */ -public class EventTimePolicy - extends TimestampPolicy<String, ActivePowerRecord> { +public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> { + protected Instant currentWatermark; public EventTimePolicy(final Optional<Instant> previousWatermark) { @@ -19,7 +19,6 @@ public class EventTimePolicy this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } - @Override public Instant getTimestampForRecord(final PartitionContext ctx, final KafkaRecord<String, ActivePowerRecord> record) { diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java index 000ddcdccd90cf3bc4f0cdaabe004ce74bef5dec..577019659f14fd6c9057868db4acbbc0c7e1447f 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java @@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.common.serialization.Deserializer; /** - * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. + * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO}. * * @param <K> Type of the Key. * @param <V> Type of the Value. @@ -17,10 +17,11 @@ import org.apache.kafka.common.serialization.Deserializer; public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; + private final PTransform<PBegin, PCollection<KV<K, V>>> reader; /** - * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. + * Instantiates a {@link PTransform} that reads from Kafka with the given configuration. */ public KafkaGenericReader( final String bootstrapServer, @@ -30,19 +31,18 @@ public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV< final Class<? extends Deserializer<V>> valueDeserializer) { super(); - // Check if boostrap server and inputTopic are defined + // Check if the boostrap server and the input topic are defined if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - this.reader = - KafkaIO.<K, V>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(keyDeserializer) - .withValueDeserializer(valueDeserializer) - .withConsumerConfigUpdates(consumerConfig) - .withoutMetadata(); + this.reader = KafkaIO.<K, V>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(keyDeserializer) + .withValueDeserializer(valueDeserializer) + .withConsumerConfigUpdates(consumerConfig) + .withoutMetadata(); } @Override diff --git a/theodolite-benchmarks/definitions/uc3-beam-flink/uc3-beam-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-beam-flink/uc3-beam-flink-benchmark-operator.yaml index 0610ed0c9d7d72b9943618116380b9d37a5acea4..f0f96ac8943063b4161593ec4c84717d73d2c848 100644 --- a/theodolite-benchmarks/definitions/uc3-beam-flink/uc3-beam-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-beam-flink/uc3-beam-flink-benchmark-operator.yaml @@ -81,6 +81,13 @@ spec: resource: "uc3-load-generator-deployment.yaml" properties: loadGenMaxRecords: "150000" + - typeName: "AggregationDurationDays" + patchers: + - type: "EnvVarPatcher" + resource: "jobmanager-deployment.yaml" + properties: + container: "jobmanager" + variableName: "AGGREGATION_DURATION_DAYS" slos: - name: "lag trend" sloType: "lag trend" diff --git a/theodolite-benchmarks/definitions/uc3-beam-samza/uc3-beam-samza-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-beam-samza/uc3-beam-samza-benchmark-operator.yaml index b61b82b59d1ef61e61ffbdbe7e79c277139f63a1..f5f6e7222370d18897b4964de6ff941d53b3e98a 100644 --- a/theodolite-benchmarks/definitions/uc3-beam-samza/uc3-beam-samza-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-beam-samza/uc3-beam-samza-benchmark-operator.yaml @@ -70,6 +70,13 @@ spec: resource: "uc3-load-generator-deployment.yaml" properties: loadGenMaxRecords: "150000" + - typeName: "AggregationDurationDays" + patchers: + - type: "EnvVarPatcher" + resource: "uc3-beam-samza-deployment.yaml" + properties: + container: "uc3-beam-samza" + variableName: "AGGREGATION_DURATION_DAYS" slos: - name: "lag trend" sloType: "lag trend" diff --git a/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml index e77316ca8fc85197c63ccc9ce1f1d6da53decf3e..87b5c9dc62c8e695b0cc8646cd3e2c0636e58653 100644 --- a/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-flink/uc3-flink-benchmark-operator.yaml @@ -81,6 +81,13 @@ spec: resource: "uc3-load-generator-deployment.yaml" properties: loadGenMaxRecords: "150000" + - typeName: "AggregationDurationDays" + patchers: + - type: "EnvVarPatcher" + resource: "jobmanager-deployment.yaml" + properties: + container: "jobmanager" + variableName: "AGGREGATION_DURATION_DAYS" slos: - name: "lag trend" sloType: "lag trend" diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-benchmark-operator.yaml index 9855cefbbdc72b7d021e76ece6045209627581d4..2c04bbbe62622fd9220a1f13ff2511757c770fc3 100644 --- a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-benchmark-operator.yaml @@ -56,6 +56,13 @@ spec: resource: "uc3-load-generator-deployment.yaml" properties: loadGenMaxRecords: "150000" + - typeName: "AggregationDurationDays" + patchers: + - type: "EnvVarPatcher" + resource: "uc3-hazelcastjet-deployment.yaml" + properties: + container: "uc-application" + variableName: "AGGREGATION_DURATION_DAYS" slos: - name: "lag trend" sloType: "lag trend" diff --git a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml index 62051755085cd5dd6870f670d2c12cf81fd29639..821316847f24144275b166209e735a87216cb9a4 100644 --- a/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc3-kstreams/uc3-kstreams-benchmark-operator.yaml @@ -61,6 +61,13 @@ spec: resource: "uc3-load-generator-deployment.yaml" properties: loadGenMaxRecords: "150000" + - typeName: "AggregationDurationDays" + patchers: + - type: "EnvVarPatcher" + resource: "uc3-kstreams-deployment.yaml" + properties: + container: "uc-application" + variableName: "AGGREGATION_DURATION_DAYS" slos: - name: "lag trend" sloType: "lag trend" diff --git a/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml index 465fcb6ee6a2b2121dfe359140c259e7a3eb763a..0e3b886e82fb91e2b7ea1c2b78252ad4d78807ee 100644 --- a/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-beam-flink/docker-compose.yml @@ -67,6 +67,8 @@ services: parallelism.default: 1 state.backend: rocksdb state.checkpoints.dir: file:///data/flink/checkpoints + - GRACE_PERIOD_MS=5000 + - TRIGGER_INTERVAL_SECONDS=1 depends_on: - schema-registry - kafka diff --git a/theodolite-benchmarks/docker-test/uc4-beam-flink/test.sh b/theodolite-benchmarks/docker-test/uc4-beam-flink/test.sh index 6a4c6dbf4f583e7598baefae8f48136bb2113630..a659365f94e5bdd42cad8bc646b5dab563f59563 100755 --- a/theodolite-benchmarks/docker-test/uc4-beam-flink/test.sh +++ b/theodolite-benchmarks/docker-test/uc4-beam-flink/test.sh @@ -2,7 +2,7 @@ until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done -docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 2000 | +docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 300 | tee /dev/stderr | awk -F ':' '!/^%/ {print $1}' | sort | diff --git a/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml index 51011d2d2645c8542724d7f84f29c9cdae970e8e..563dbf5acdb9f7b5dae3e49676de232b3afacada 100644 --- a/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-beam-samza/docker-compose.yml @@ -52,6 +52,8 @@ services: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 ENABLE_METRICS: "false" + GRACE_PERIOD_MS: 5000 + TRIGGER_INTERVAL_SECONDS: 1 load-generator: image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: diff --git a/theodolite-benchmarks/docker-test/uc4-beam-samza/test.sh b/theodolite-benchmarks/docker-test/uc4-beam-samza/test.sh index 6a4c6dbf4f583e7598baefae8f48136bb2113630..a659365f94e5bdd42cad8bc646b5dab563f59563 100755 --- a/theodolite-benchmarks/docker-test/uc4-beam-samza/test.sh +++ b/theodolite-benchmarks/docker-test/uc4-beam-samza/test.sh @@ -2,7 +2,7 @@ until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done -docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 2000 | +docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 300 | tee /dev/stderr | awk -F ':' '!/^%/ {print $1}' | sort | diff --git a/theodolite-benchmarks/docker-test/uc4-kstreams/docker-compose.yml b/theodolite-benchmarks/docker-test/uc4-kstreams/docker-compose.yml index 6aaa02990841547edb6059e4e2fbf3b28b50985c..000214d8fda2823c6fccc9257baaa2f500cc76c5 100755 --- a/theodolite-benchmarks/docker-test/uc4-kstreams/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc4-kstreams/docker-compose.yml @@ -45,6 +45,8 @@ services: environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 + COMMIT_INTERVAL_MS: 1000 + GRACE_PERIOD_MS: 5000 load-generator: image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} depends_on: diff --git a/theodolite-benchmarks/docker-test/uc4-kstreams/test.sh b/theodolite-benchmarks/docker-test/uc4-kstreams/test.sh index 9b9dee7dc78e7a587b9f2e5b778066e5bc099755..a659365f94e5bdd42cad8bc646b5dab563f59563 100755 --- a/theodolite-benchmarks/docker-test/uc4-kstreams/test.sh +++ b/theodolite-benchmarks/docker-test/uc4-kstreams/test.sh @@ -2,10 +2,10 @@ until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done -docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 32 | +docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 300 | tee /dev/stderr | awk -F ':' '!/^%/ {print $1}' | sort | uniq | wc -l | - grep "\b16\b" + grep "\b21\b" diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java index 4d1c2241eefa8706c29d08256304ecec8313e478..09d59a83b1529613e71d26d5b562a27d7b882de8 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/AggregatedToActive.java @@ -16,7 +16,11 @@ public class AggregatedToActive @Override public KV<String, ActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { - return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(), - kv.getValue().getTimestamp(), kv.getValue().getSumInW())); + return KV.of( + kv.getKey(), + new ActivePowerRecord( + kv.getValue().getIdentifier(), + kv.getValue().getTimestamp(), + kv.getValue().getSumInW())); } } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java index 0f314f1497708f73c7bc00337438f7a53d081731..f34f7014135833aef4022ac1e07dce2271e4faf7 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java @@ -15,8 +15,6 @@ public final class Uc4ConfigurationKeys { public static final String GRACE_PERIOD_MS = "grace.period.ms"; - // public static final String TRIGGER_ENABLE = "trigger.enable"; - public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; private Uc4ConfigurationKeys() {} diff --git a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties index 654d7d94b70ff03cab6152fef67c55a073a58704..e761c26461667d09b95b579b5a6452b6c833b342 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-beam/src/main/resources/META-INF/application.properties @@ -10,9 +10,8 @@ kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 emit.period.ms=5000 -#trigger.enable=true -trigger.interval.seconds=15 -grace.period.ms=270000 +trigger.interval.seconds=1 +grace.period.ms=5000 specific.avro.reader=true diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index abdb9aaed5d5209c2932d15039d9fecb687327b5..e0c2a2a0035704fd6331fb2c8d94022e5e686e8e 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -10,6 +10,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; @@ -61,17 +62,20 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { } @Override - protected void buildPipeline() { + protected void buildPipeline() { // NOPMD // Get configurations final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final Time windowSize = Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); + final Time triggerDuration = + Time.seconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final String configurationTopic = this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); @@ -84,9 +88,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class); // TODO Watermarks? - // Source from output topic with AggregatedPowerRecords + // Source from feedback topic with AggregatedPowerRecords final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = - kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); + kafkaConnector.createConsumer(feedbackTopic, AggregatedActivePowerRecord.class); final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource = kafkaConnector.createConsumer( @@ -104,6 +108,14 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + // Sink to feedback topic with SensorId, AggregatedActivePowerRecord + final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaFeedbackSink = + kafkaConnector.createProducer( + feedbackTopic, + Serdes::String, + () -> new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl).forValues(), + Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + // Build input stream final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS @@ -131,7 +143,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { || tuple.f0 == Event.SENSOR_REGISTRY_STATUS) .name("[Filter] SensorRegistry changed") .map(tuple -> SensorRegistry.fromJson(tuple.f1)).name("[Map] JSON -> SensorRegistry") - .keyBy(sr -> 1) + .keyBy(sr -> 1) // The following flatMap is stateful so we need a key .flatMap(new ChildParentsFlatMapFunction()) .name("[FlatMap] SensorRegistry -> (ChildSensor, ParentSensor[])"); @@ -147,15 +159,18 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace)) .keyBy(t -> t.f0.getParent()) .window(TumblingEventTimeWindows.of(windowSize)) + .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration)) .process(new RecordAggregationProcessWindowFunction()) .name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord"); // add Kafka Sink - aggregationStream + final DataStream<Tuple2<String, AggregatedActivePowerRecord>> results = aggregationStream .map(value -> new Tuple2<>(value.getIdentifier(), value)) .name("[Map] AggregatedActivePowerRecord -> (Sensor, AggregatedActivePowerRecord)") - .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))) - .addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); + .returns(Types.TUPLE(Types.STRING, TypeInformation.of(AggregatedActivePowerRecord.class))); + + results.addSink(kafkaAggregationSink).name("[Kafka Producer] Topic: " + outputTopic); // NOCS + results.addSink(kafkaFeedbackSink).name("[Kafka Producer] Topic: " + feedbackTopic); // NOCS } public static void main(final String[] args) { diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java index 6fd2b0fa0ec50febd213fb3f7d24463d2bd6f51c..49c38ae66184bf7c987e770bf08da80b5b02e5c1 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java @@ -11,12 +11,16 @@ public final class Uc4ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String EMIT_PERIOD_MS = "emit.period.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + private Uc4ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties index 9250ec02fc36ad04d080204fb254e59ae3231cec..bb26b56fa72a896ca588376e9a98b8b0251e3ec5 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-flink/src/main/resources/META-INF/application.properties @@ -1,17 +1,17 @@ application.name=theodolite-uc4-application application.version=0.0.1 -configuration.host=localhost -configuration.port=8082 configuration.kafka.topic=configuration kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 emit.period.ms=5000 -grace.period.ms=0 +trigger.interval.seconds=1 +grace.period.ms=5000 # Flink configuration checkpointing.interval.ms=1000 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java index b9b599ed89b243d9a1f6c84e765751e4ebe21ecd..8e1f9f7058dced1ee50a5b1c1737e6846cf6525e 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/ChildParentsTransformer.java @@ -1,16 +1,9 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import rocks.theodolite.benchmarks.commons.configuration.events.Event; import rocks.theodolite.benchmarks.commons.model.sensorregistry.AggregatedSensor; import rocks.theodolite.benchmarks.commons.model.sensorregistry.Sensor; import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; @@ -23,46 +16,7 @@ import rocks.theodolite.benchmarks.commons.model.sensorregistry.SensorRegistry; * does not longer exists in the sensor registry. * */ -public class ChildParentsTransformer implements - Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> { - - private final String stateStoreName; - // private ProcessorContext context; - private KeyValueStore<String, Set<String>> state; - - public ChildParentsTransformer(final String stateStoreName) { - this.stateStoreName = stateStoreName; - } - - @Override - @SuppressWarnings("unchecked") - public void init(final ProcessorContext context) { - // this.context = context; - this.state = (KeyValueStore<String, Set<String>>) context.getStateStore(this.stateStoreName); - } - - @Override - public Iterable<KeyValue<String, Optional<Set<String>>>> transform(final Event event, - final SensorRegistry registry) { - - // Values may later be null for deleting a sensor - final Map<String, Set<String>> childParentsPairs = this.constructChildParentsPairs(registry); - - this.updateChildParentsPairs(childParentsPairs); - - this.updateState(childParentsPairs); - - return childParentsPairs - .entrySet() - .stream() - .map(e -> KeyValue.pair(e.getKey(), Optional.ofNullable(e.getValue()))) - .collect(Collectors.toList()); - } - - @Override - public void close() { - // Do nothing - } +public class ChildParentsTransformer { /** * Constructs a map of keys to their set of parents out of a SensorRegistry. @@ -87,33 +41,4 @@ public class ChildParentsTransformer implements : Stream.empty())); } - private void updateChildParentsPairs(final Map<String, Set<String>> childParentsPairs) { - final KeyValueIterator<String, Set<String>> oldChildParentsPairs = this.state.all(); - while (oldChildParentsPairs.hasNext()) { - final KeyValue<String, Set<String>> oldChildParentPair = oldChildParentsPairs.next(); - final String identifier = oldChildParentPair.key; - final Set<String> oldParents = oldChildParentPair.value; - final Set<String> newParents = childParentsPairs.get(identifier); // null if not exists - if (newParents == null) { - // Sensor was deleted - childParentsPairs.put(identifier, null); - } else if (newParents.equals(oldParents)) { - // No changes - childParentsPairs.remove(identifier); - } - // Else: Later Perhaps: Mark changed parents - } - oldChildParentsPairs.close(); - } - - private void updateState(final Map<String, Set<String>> childParentsPairs) { - for (final Map.Entry<String, Set<String>> childParentPair : childParentsPairs.entrySet()) { - if (childParentPair.getValue() == null) { - this.state.delete(childParentPair.getKey()); - } else { - this.state.put(childParentPair.getKey(), childParentPair.getValue()); - } - } - } - } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java deleted file mode 100644 index 61910850bf3f66025866acb93d92b24b4b71d692..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HashMapSupplier.java +++ /dev/null @@ -1,24 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import com.hazelcast.function.SupplierEx; -import java.util.HashMap; -import java.util.Set; - -/** - * Supplies a {@link HashMap} and implements {@link SupplierEx}. - */ -public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> { - - private static final long serialVersionUID = -6247504592403610702L; // NOPMD - - @Override - public HashMap<String, Set<String>> get() { - return new HashMap<>(); - } - - @Override - public HashMap<String, Set<String>> getEx() throws Exception { - return this.get(); - } - -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 97ea33eda56f34d5f1a2f8e5def8373c259540d0..5790d5e3dc697d6799ff34a39227a750c77b9003 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -53,16 +53,24 @@ public class HistoryService extends HazelcastJetService { final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final Duration windowSize = Duration.ofMillis( + final Duration emirPeriod = Duration.ofMillis( this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); + final Duration gracePeriod = Duration.ofMillis( + this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); + + final Duration triggerPeriod = Duration.ofSeconds( + this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); + this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, kafkaConfigReadProps, kafkaAggregationReadProps, kafkaWriteProps, this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic, - windowSize); + emirPeriod, + gracePeriod, + triggerPeriod); } @Override diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java index 6c4c63396208cafc68a83835f29609b8582370ca..3eedfc0bc6a58130cb60b44b1aee45ad75ee6479 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java @@ -12,6 +12,9 @@ public class Uc4ConfigurationKeys { public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; public static final String EMIT_PERIOD_MS = "emit.period.ms"; - // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index 59b5941fb9f0090074869b00d49ad26c68e40165..d4a7657b968372d5b32a3a07a3878b74498bb5cb 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -16,9 +16,7 @@ import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStageWithKey; import com.hazelcast.jet.pipeline.WindowDefinition; import java.time.Duration; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -45,6 +43,10 @@ public class Uc4PipelineFactory extends PipelineFactory { private final Duration emitPeriod; + private final Duration gracePeriod; + + private final Duration triggerPeriod; + /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. @@ -61,7 +63,7 @@ public class Uc4PipelineFactory extends PipelineFactory { * @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline. * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline. - * @param windowSize The window size of the tumbling window used in this pipeline. + * @param emitPeriod The window size of the tumbling window used in this pipeline. */ public Uc4PipelineFactory(final Properties kafkaInputReadPropsForPipeline, // NOPMD final Properties kafkaConfigPropsForPipeline, @@ -71,7 +73,9 @@ public class Uc4PipelineFactory extends PipelineFactory { final String kafkaOutputTopic, final String kafkaConfigurationTopic, final String kafkaFeedbackTopic, - final Duration windowSize) { + final Duration emitPeriod, + final Duration gracePeriod, + final Duration triggerPeriod) { super(kafkaInputReadPropsForPipeline, kafkaInputTopic, kafkaWritePropsForPipeline, kafkaOutputTopic); @@ -79,7 +83,9 @@ public class Uc4PipelineFactory extends PipelineFactory { this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline; this.kafkaConfigurationTopic = kafkaConfigurationTopic; this.kafkaFeedbackTopic = kafkaFeedbackTopic; - this.emitPeriod = windowSize; + this.emitPeriod = emitPeriod; + this.gracePeriod = gracePeriod; + this.triggerPeriod = triggerPeriod; } /** @@ -155,13 +161,13 @@ public class Uc4PipelineFactory extends PipelineFactory { ////////////////////////////////// // (1) Configuration Stream this.pipe.readFrom(configurationSource) - .withNativeTimestamps(0) + .withNativeTimestamps(this.gracePeriod.toMillis()) .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) .flatMapStateful(HashMap::new, new ConfigFlatMap()) .writeTo(Sinks.mapWithUpdating( - SENSOR_PARENT_MAP_NAME, // The addressed IMAP + SENSOR_PARENT_MAP_NAME, // The addressed IMap Entry::getKey, // The key to look for (oldValue, newEntry) -> newEntry.getValue())); @@ -169,13 +175,13 @@ public class Uc4PipelineFactory extends PipelineFactory { // (1) Sensor Input Stream final StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe .readFrom(inputSource) - .withNativeTimestamps(0); + .withNativeTimestamps(this.gracePeriod.toMillis()); ////////////////////////////////// // (1) Aggregation Stream final StreamStage<Entry<String, ActivePowerRecord>> aggregations = this.pipe .readFrom(aggregationSource) - .withNativeTimestamps(0) + .withNativeTimestamps(this.gracePeriod.toMillis()) .map(entry -> { // Map Aggregated to ActivePowerRecord final AggregatedActivePowerRecord agg = entry.getValue(); final ActivePowerRecord record = new ActivePowerRecord( @@ -214,24 +220,20 @@ public class Uc4PipelineFactory extends PipelineFactory { final ActivePowerRecord record = entry.getValue().getRecord(); final Set<String> groups = entry.getValue().getGroups(); - // Transformed Data - final String[] groupList = groups.toArray(String[]::new); - final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>(); - for (int i = 0; i < groupList.length; i++) { - newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); - newEntryList.add(Util.entry(newKeyList[i], record)); - } - // Return traversable list of new entry elements - return Traversers.traverseIterable(newEntryList); + return Traversers.traverseStream( + groups + .stream() + .map(group -> Util.entry(new SensorGroupKey(keyGroupId, group), record))); }); ////////////////////////////////// // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues = - dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis())); + dupliAsFlatmappedStage.window(WindowDefinition + .tumbling(this.emitPeriod.toMillis()) + .setEarlyResultsPeriod(this.triggerPeriod.toMillis())); final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS AggregateOperation @@ -253,7 +255,8 @@ public class Uc4PipelineFactory extends PipelineFactory { return windowedLastValues .groupingKey(entry -> entry.getKey().getGroup()) - .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue())); + .aggregate(aggrOp) + .map(agg -> Util.entry(agg.getKey(), agg.getValue())); } @@ -270,7 +273,7 @@ public class Uc4PipelineFactory extends PipelineFactory { final Map<String, Set<String>> flatMapStage, final Entry<Event, SensorRegistry> eventItem) { // Transform new Input - final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); + final ChildParentsTransformer transformer = new ChildParentsTransformer(); final Map<String, Set<String>> mapFromRegistry = transformer.constructChildParentsPairs(eventItem.getValue()); @@ -286,11 +289,8 @@ public class Uc4PipelineFactory extends PipelineFactory { } } - // Create a updates list to pass onto the next pipeline stage- - final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet()); - // Return traverser with updates list. - return Traversers.traverseIterable(updatesList) + return Traversers.traverseIterable(updates.entrySet()) .map(e -> Util.entry(e.getKey(), e.getValue())); } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties index af877044b6e17665b6a18af41ec72ab6cedf0f91..af55655bb773635ff0fb116e7ad7c491a770bd09 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/resources/META-INF/application.properties @@ -10,4 +10,5 @@ kafka.feedback.topic=aggregation-feedback schema.registry.url=http://localhost:8081 emit.period.ms=5000 -#grace.period.ms=0 +trigger.interval.seconds=1 +grace.period.ms=5000 diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java index 29a561d1bd039f70b2540014f970a03094418532..44646d908da24b8ae201aa2fedf267a79584377c 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineTest.java @@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport { final Properties properties = new Properties(); final Uc4PipelineFactory factory = new Uc4PipelineFactory( properties, properties, properties, properties, "", "", - "", "", testWindowSize); + "", "", testWindowSize, Duration.ofSeconds(1), Duration.ofMillis(0)); this.uc4Topology = factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource); diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-kstreams/src/main/resources/META-INF/application.properties index 8a1d86ae3ecc419badae62d62c102ec8fafb4730..3a6048099c554e665c96bebc521e2ba40bea3b1b 100644 --- a/theodolite-benchmarks/uc4-kstreams/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc4-kstreams/src/main/resources/META-INF/application.properties @@ -10,7 +10,7 @@ kafka.output.topic=output schema.registry.url=http://localhost:8081 emit.period.ms=5000 -grace.period.ms=0 +grace.period.ms=5000 # Kafka Streams Config -commit.interval.ms=5000 +commit.interval.ms=1000 diff --git a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java index c9fb98a70481a511f8ee0e171093e8d0454e10f8..8a725b691d8e39c3e1abd33cbdcb6f92a8222075 100644 --- a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java +++ b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java @@ -9,7 +9,6 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.serialization.Serde; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; - public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory { private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; diff --git a/theodolite/build.gradle b/theodolite/build.gradle index fec37943df2546a4b675085db2f064bf54585cd2..50f2d6861210ee1c7dc06fa6d99dba8c43765bc8 100644 --- a/theodolite/build.gradle +++ b/theodolite/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation 'io.github.microutils:kotlin-logging:2.1.16' implementation 'org.apache.kafka:kafka-clients:2.8.0' implementation 'khttp:khttp:1.0.0' + implementation 'net.objecthunter:exp4j:0.4.8' testImplementation 'io.quarkus:quarkus-junit5' testImplementation 'io.quarkus:quarkus-test-kubernetes-client' diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/KubernetesBenchmarkDeploymentBuilder.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/KubernetesBenchmarkDeploymentBuilder.kt index 67fe92afb8aa4c9edda2474fc6307c16c21a41f6..810999a71cdbd17554cb3fe99ef96e0bd45ac598 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/KubernetesBenchmarkDeploymentBuilder.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/KubernetesBenchmarkDeploymentBuilder.kt @@ -40,14 +40,25 @@ class KubernetesBenchmarkDeploymentBuilder (val kubernetesBenchmark: KubernetesB val appResources = loadKubernetesResources(kubernetesBenchmark.sut.resources, this.client).toResourceMap() val loadGenResources = loadKubernetesResources(kubernetesBenchmark.loadGenerator.resources, this.client).toResourceMap() - // patch the load dimension the resources + // patch the load dimension loadPatcherDefinitions.forEach { patcherDefinition -> - loadGenResources[patcherDefinition.resource] = - PatchHandler.patchResource(loadGenResources, patcherDefinition, load.toString()) + if (appResources.keys.contains(patcherDefinition.resource)) { + appResources[patcherDefinition.resource] = + PatchHandler.patchResource(appResources, patcherDefinition, load.toString()) + } else { + loadGenResources[patcherDefinition.resource] = + PatchHandler.patchResource(loadGenResources, patcherDefinition, load.toString()) + } } + // patch the resource dimension resourcePatcherDefinitions.forEach { patcherDefinition -> - appResources[patcherDefinition.resource] = - PatchHandler.patchResource(appResources, patcherDefinition, resource.toString()) + if (appResources.keys.contains(patcherDefinition.resource)) { + appResources[patcherDefinition.resource] = + PatchHandler.patchResource(appResources, patcherDefinition, resource.toString()) + } else { + loadGenResources[patcherDefinition.resource] = + PatchHandler.patchResource(loadGenResources, patcherDefinition, resource.toString()) + } } // Patch the given overrides diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcher.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcher.kt new file mode 100644 index 0000000000000000000000000000000000000000..b1c168987d894bd33b848de10c67476628fde3e0 --- /dev/null +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcher.kt @@ -0,0 +1,37 @@ +package rocks.theodolite.kubernetes.patcher + +import io.fabric8.kubernetes.api.model.ConfigMap +import io.fabric8.kubernetes.api.model.HasMetadata +import java.io.StringReader +import java.io.StringWriter +import java.util.Properties + +/** + * The ConfigMapYamlPatcher allows to add/modify a key-value pair in a .properties file of a ConfigMap + * + * @property fileName of the .properties file in the ConfigMap that should be modified. + * @property variableName Name of the environment variable to be patched. + */ +class ConfigMapPropertiesPatcher( + private val fileName: String, + private val variableName: String, +) : AbstractStringPatcher() { + + override fun patchSingleResource(resource: HasMetadata, value: String): HasMetadata { + if (resource is ConfigMap) { + val propertiesFile = resource.data[fileName] + + // Read properties string + val properties = Properties().also { it.load(StringReader(propertiesFile)) } + + // Change value + properties.setProperty(this.variableName, value) + + // Convert back to String and set in Kubernetes resource + val writer = StringWriter() + properties.store(writer, null) + resource.data[fileName] = writer.toString() + } + return resource + } +} diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/PatcherFactory.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/PatcherFactory.kt index 4b75999e70197f775e7fa610eeb6700545bf6869..f469eaee39e5e2eb9e5aa0ae2bd8346eabf3006d 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/PatcherFactory.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/PatcherFactory.kt @@ -78,6 +78,15 @@ class PatcherFactory { suffix = patcher.properties["suffix"], factor = patcher.properties["factor"]?.toInt(), ) + "ConfigMapPropertiesPatcher" -> DecoratingPatcher( + ConfigMapPropertiesPatcher( + fileName = patcher.properties["fileName"] ?: throwInvalid(patcher), + variableName = patcher.properties["variableName"] ?: throwInvalid(patcher) + ), + prefix = patcher.properties["prefix"], + suffix = patcher.properties["suffix"], + factor = patcher.properties["factor"]?.toInt(), + ) "NamePatcher" -> NamePatcher() "ServiceSelectorPatcher" -> ServiceSelectorPatcher( variableName = patcher.properties["label"] ?: throwInvalid(patcher) diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/VolumesConfigMapPatcher.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/VolumesConfigMapPatcher.kt index 54ac12be7629da792cbb72b2f1d409ba0f4ba93c..a0483a5d4576831848a62dcecb5c987c8ce8656a 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/VolumesConfigMapPatcher.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/patcher/VolumesConfigMapPatcher.kt @@ -7,37 +7,28 @@ import io.fabric8.kubernetes.api.model.apps.StatefulSet class VolumesConfigMapPatcher(private var volumeName: String) : AbstractStringPatcher() { override fun patchSingleResource(resource: HasMetadata, value: String): HasMetadata { - if (resource is Deployment) { - if (resource.spec.template.spec.volumes == null) { - resource.spec.template.spec.volumes = mutableListOf() + val volumeMounts = when(resource) { + is Deployment -> { + if (resource.spec.template.spec.volumes == null) { + resource.spec.template.spec.volumes = mutableListOf() + } + resource.spec.template.spec.volumes } - val volumeMounts = resource.spec.template.spec.volumes - - for (mount in volumeMounts) { - try { - if (mount.configMap.name == volumeName) { - mount.configMap.name = value - } - } catch (_: NullPointerException) { + is StatefulSet -> { + if (resource.spec.template.spec.volumes == null) { + resource.spec.template.spec.volumes = mutableListOf() } + resource.spec.template.spec.volumes } + else -> emptyList() // No volumes to patch } - if (resource is StatefulSet) { - if (resource.spec.template.spec.volumes == null) { - resource.spec.template.spec.volumes = mutableListOf() - } - val volumeMounts = resource.spec.template.spec.volumes - for (mount in volumeMounts) { - try { - if (mount.configMap.name == volumeName) { - mount.configMap.name = value - } - } catch (_: NullPointerException) { - } + for (mount in volumeMounts) { + // Find ConfigMap volume with requested name + if (mount.configMap?.name?.equals(volumeName) == true) { + mount.configMap.name = value } } - return resource } } \ No newline at end of file diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/AnalysisExecutor.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/AnalysisExecutor.kt index 96c5a43b85c0db5600d813f9e799903927a53ec3..b7cd32d735fe2cdc0888df4a563499ca76b60886 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/AnalysisExecutor.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/AnalysisExecutor.kt @@ -60,7 +60,7 @@ class AnalysisExecutor( sloType = slo.sloType, properties = slo.properties, load = load, - resource = resource, + resources = resource, metric = metric ) diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactory.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactory.kt index a789050a106f1b95be7c1d55043cc9d46a15ffbf..cc0e92965679adb0fd1d02655b3ff683559df4be 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactory.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactory.kt @@ -1,6 +1,8 @@ package rocks.theodolite.kubernetes.slo +import net.objecthunter.exp4j.ExpressionBuilder import rocks.theodolite.core.strategies.Metric +import kotlin.math.pow /** @@ -34,7 +36,9 @@ class SloCheckerFactory { * * @param sloType Type of the [SloChecker]. * @param properties map of properties to use for the SLO checker creation. - * @param load that is executed in the experiment. + * @param load Load that is generated in the experiment. + * @param resources Resources that are used in the experiment. + * @param metric Metric used in the benchmark execution. * * @return A [SloChecker] * @throws IllegalArgumentException If [sloType] not supported. @@ -43,10 +47,10 @@ class SloCheckerFactory { sloType: String, properties: Map<String, String>, load: Int, - resource: Int, + resources: Int, metric: Metric - ): SloChecker { - return when (SloTypes.from(sloType)) { + ): SloChecker = + when (SloTypes.from(sloType)) { SloTypes.GENERIC -> ExternalSloChecker( externalSlopeURL = properties["externalSloUrl"] ?: throw IllegalArgumentException("externalSloUrl expected"), @@ -58,8 +62,11 @@ class SloCheckerFactory { "repetitionAggregation" to (properties["repetitionAggregation"] ?: throw IllegalArgumentException("repetitionAggregation expected")), "operator" to (properties["operator"] ?: throw IllegalArgumentException("operator expected")), - "threshold" to (properties["threshold"]?.toDouble() - ?: throw IllegalArgumentException("threshold expected")) + "threshold" to (properties["threshold"]?.toDoubleOrNull() + ?: properties["thresholdRelToLoad"]?.toDoubleOrNull()?.times(load) + ?: properties["thresholdRelToResources"]?.toDoubleOrNull()?.times(resources) + ?: properties["thresholdFromExpression"]?.let { this.eval(it, load, resources) } + ?: throw IllegalArgumentException("'threshold', 'thresholdRelToLoad' or 'thresholdRelToResources' or 'thresholdFromExpression' expected")) ) ) SloTypes.LAG_TREND, SloTypes.DROPPED_RECORDS -> ExternalSloChecker( @@ -67,8 +74,11 @@ class SloCheckerFactory { ?: throw IllegalArgumentException("externalSloUrl expected"), metadata = mapOf( "warmup" to (properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected")), - "threshold" to (properties["threshold"]?.toDouble() - ?: throw IllegalArgumentException("threshold expected")) + "threshold" to (properties["threshold"]?.toDoubleOrNull() + ?: properties["thresholdRelToLoad"]?.toDoubleOrNull()?.times(load) + ?: properties["thresholdRelToResources"]?.toDoubleOrNull()?.times(resources) + ?: properties["thresholdFromExpression"]?.let { this.eval(it, load, resources) } + ?: throw IllegalArgumentException("Valid 'threshold', 'thresholdRelToLoad' or 'thresholdRelToResources' or 'thresholdFromExpression' expected")) ) ) SloTypes.LAG_TREND_RATIO, SloTypes.DROPPED_RECORDS_RATIO -> { @@ -91,6 +101,15 @@ class SloCheckerFactory { ) ) } - } + + } + + private fun eval(expression: String, load: Int, resources: Int): Double { + return ExpressionBuilder(expression) + .variables("L", "R") + .build() + .setVariable("L", load.toDouble()) + .setVariable("R", resources.toDouble()) + .evaluate() } } diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloFactory.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloFactory.kt index 047f8a657de8aba6f032d36e8b84d7046d5e0209..ef67348c0c848226b5b4127a34880fcd5a6bc1e9 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloFactory.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloFactory.kt @@ -7,18 +7,15 @@ import rocks.theodolite.kubernetes.model.KubernetesBenchmark.Slo class SloFactory { fun createSlos(execution: BenchmarkExecution, benchmark: KubernetesBenchmark): List<Slo> { - var benchmarkSlos = benchmark.slos - var executionSlos = execution.slos + val resultSlos = benchmark.slos.toMutableList() - for(executionSlo in executionSlos) { - for(i in 0 until benchmarkSlos.size) { - if(executionSlo.name == benchmarkSlos[i].name && executionSlo.properties != null) { - for (executionProperty in executionSlo.properties!!) { - benchmarkSlos[i].properties[executionProperty.key] = executionProperty.value - } + for (executionSlo in execution.slos) { + for (resultSlo in resultSlos) { + if (executionSlo.name == resultSlo.name && executionSlo.properties != null) { + resultSlo.properties.putAll(executionSlo.properties!!) } } } - return benchmarkSlos + return resultSlos } } \ No newline at end of file diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloTypes.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloTypes.kt index 07cbcd634ec7b46bd0e66a52f62989660575765f..496600f2bf20446568025b4377a633803b4fcbc5 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloTypes.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/slo/SloTypes.kt @@ -3,9 +3,9 @@ package rocks.theodolite.kubernetes.slo enum class SloTypes(val value: String) { GENERIC("generic"), LAG_TREND("lag trend"), - LAG_TREND_RATIO("lag trend ratio"), - DROPPED_RECORDS("dropped records"), - DROPPED_RECORDS_RATIO("dropped records ratio"); + @Deprecated("Use LAG_TREND with relative threshold instead.") LAG_TREND_RATIO("lag trend ratio"), + @Deprecated("Use GENERIC instead.") DROPPED_RECORDS("dropped records"), + @Deprecated("Use GENERIC with relative threshold instead.") DROPPED_RECORDS_RATIO("dropped records ratio"); companion object { fun from(type: String): SloTypes = diff --git a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcherTest.kt b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcherTest.kt new file mode 100644 index 0000000000000000000000000000000000000000..493343baf77350c09fb7c16ca93bb1215660d3db --- /dev/null +++ b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapPropertiesPatcherTest.kt @@ -0,0 +1,50 @@ +package rocks.theodolite.kubernetes.patcher + +import io.fabric8.kubernetes.api.model.ConfigMap +import io.fabric8.kubernetes.api.model.ConfigMapBuilder +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +internal class ConfigMapPropertiesPatcherTest { + + private lateinit var configMap: ConfigMap + + private val patcher = ConfigMapPropertiesPatcher("some-file.properties", "second.prop.value") + + @BeforeEach + fun setUp() { + val data = mapOf( + "some-file.properties" to """ + first.properties.value = some-test + second.prop.value = 1 + third = 1234 + """.trimIndent() + ) + + this.configMap = ConfigMapBuilder() + .withNewMetadata() + .withName("example") + .endMetadata() + .addToData(data) + .build() + } + + @ParameterizedTest + @ValueSource(strings = ["some-string", "42", "42.42", "true"]) + fun setSettingString(inputValue: String) { + val patched = patcher.patchSingleResource(this.configMap, inputValue) + assertTrue(patched is ConfigMap) + //patched.let { it as ConfigMap }.data + patched as ConfigMap + val properties = patched.data["some-file.properties"] + assertTrue(properties != null) + val matchLines = properties!!.lines().filter { it.startsWith("second.prop.value") } + assertEquals(1, matchLines.size) + val value = matchLines[0].split("=").getOrNull(1) + assertEquals(inputValue, value) + } + + +} \ No newline at end of file diff --git a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapYamlPatcherTest.kt b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapYamlPatcherTest.kt index 474080b5cea34a621d387e6f0a805a9cb5e1820c..38e807dc4fd06c1974c0ef3b2f9d10dc7f6bbab1 100644 --- a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapYamlPatcherTest.kt +++ b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/patcher/ConfigMapYamlPatcherTest.kt @@ -20,6 +20,7 @@ internal class ConfigMapYamlPatcherTest { val data = mapOf( "some-file.yaml" to """ first: some-test + # some comment: with colon second: 1 third: 1234 """.trimIndent() diff --git a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactoryTest.kt b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactoryTest.kt index a61119bbe0e5be180ccf3ca2c54ac2829eb4558d..5aaac2ecb7439d71417ce80df056e01aa8b8b8f8 100644 --- a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactoryTest.kt +++ b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloCheckerFactoryTest.kt @@ -9,11 +9,12 @@ import rocks.theodolite.core.strategies.Metric @QuarkusTest internal class SloCheckerFactoryTest { + private val factory = SloCheckerFactory() + @Test fun testCreateGenericSloWithoutUrl() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "warmup" to "60", @@ -31,9 +32,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloWithoutWarmup() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -51,9 +51,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloWithoutQueryAggregation() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -71,9 +70,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloWithoutRepetitionAggregation() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -91,9 +89,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloWithoutOperator() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -111,9 +108,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloWithoutThreshold() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -131,8 +127,7 @@ internal class SloCheckerFactoryTest { @Test fun testCreateGenericSloFloatThreshold() { - val factory = SloCheckerFactory() - val sloChecker = factory.create( + val sloChecker = this.factory.create( SloTypes.GENERIC.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -152,11 +147,143 @@ internal class SloCheckerFactoryTest { assertEquals(12.34, threshold as Double, 0.01) } + @Test + fun testCreateGenericSloWithThresholdRelToLoad() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "thresholdRelToLoad" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(10.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateGenericSloWithThresholdRelToLoadAndInvalidThreshold() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "threshold" to "", + "thresholdRelToLoad" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(10.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateGenericSloWithThresholdRelToResources() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "thresholdRelToResources" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(0.5, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateGenericSloWithConstantThresholdFromExpression() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "thresholdFromExpression" to "1111" + ), + 8, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(1111.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateGenericSloWithSimpleThresholdFromExpression() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "thresholdFromExpression" to "L*5" + ), + 8, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(40.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateGenericSloWithComplexThresholdFromExpression() { + val sloChecker = this.factory.create( + SloTypes.GENERIC.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "queryAggregation" to "median", + "repetitionAggregation" to "median", + "operator" to "lte", + "thresholdFromExpression" to "R*((2^L+4)-60)+111" + ), + 8, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(1111.0, computedThreshold as Double, 0.001) + } + @Test fun testCreateLagTrendSloWithoutUrl() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND.value, mapOf( "warmup" to "60", @@ -171,9 +298,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendSloWithoutWarmup() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -189,9 +315,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendSloWithoutThreshold() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -206,8 +331,7 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendSloFloatThreshold() { - val factory = SloCheckerFactory() - val sloChecker = factory.create( + val sloChecker = this.factory.create( SloTypes.LAG_TREND.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -224,11 +348,68 @@ internal class SloCheckerFactoryTest { assertEquals(12.34, threshold as Double, 0.01) } + @Test + fun testCreateLagTrendSloWithThresholdRelToLoad() { + val sloChecker = this.factory.create( + SloTypes.LAG_TREND.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "thresholdRelToLoad" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(10.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateLagTrendSloWithThresholdRelToLoadAndInvalidThreshold() { + val sloChecker = this.factory.create( + SloTypes.LAG_TREND.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "threshold" to "", + "thresholdRelToLoad" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(10.0, computedThreshold as Double, 0.001) + } + + @Test + fun testCreateLagTrendSloWithThresholdRelToResources() { + val sloChecker = this.factory.create( + SloTypes.LAG_TREND.value, + mapOf( + "externalSloUrl" to "http://localhost:1234", + "warmup" to "60", + "thresholdRelToResources" to "0.1" + ), + 100, + 5, + Metric.DEMAND + ) + assertTrue(sloChecker is ExternalSloChecker) + val computedThreshold = (sloChecker as ExternalSloChecker).metadata["threshold"] + assertTrue(computedThreshold is Double) + assertEquals(0.5, computedThreshold as Double, 0.001) + } + @Test fun testCreateLagTrendRatioSloWithoutUrl() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND_RATIO.value, mapOf( "warmup" to "60", @@ -243,9 +424,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendRatioSloWithoutWarmup() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND_RATIO.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -261,9 +441,8 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendRatioSloWithoutRatioThreshold() { - val factory = SloCheckerFactory() assertThrows<IllegalArgumentException> { - factory.create( + this.factory.create( SloTypes.LAG_TREND_RATIO.value, mapOf( "externalSloUrl" to "http://localhost:1234", @@ -278,8 +457,7 @@ internal class SloCheckerFactoryTest { @Test fun testCreateLagTrendRatioSloFloatThreshold() { - val factory = SloCheckerFactory() - val sloChecker = factory.create( + val sloChecker = this.factory.create( SloTypes.LAG_TREND_RATIO.value, mapOf( "externalSloUrl" to "http://localhost:1234", diff --git a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloFactoryTest.kt b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloFactoryTest.kt index de9d4c60dbad069ccb1229bebb4a4751cf96d98d..47956b3d952eeec0cfb3b0b8c497dc226a7d3adb 100644 --- a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloFactoryTest.kt +++ b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/slo/SloFactoryTest.kt @@ -9,50 +9,47 @@ import rocks.theodolite.kubernetes.model.KubernetesBenchmark @QuarkusTest internal class SloFactoryTest { + private val sloFactory = SloFactory() + + private val benchmark = KubernetesBenchmark().also { bench -> + bench.slos = mutableListOf( + KubernetesBenchmark.Slo().also { + it.name = "test" + it.sloType = "lag trend" + it.prometheusUrl = "test.de" + it.offset = 0 + it.properties = mutableMapOf( + "threshold" to "2000", + "externalSloUrl" to "http://localhost:80/evaluate-slope", + "warmup" to "60" + ) + } + ) + } + @Test fun overwriteSloTest() { - - val benchmark = KubernetesBenchmark() - val execution = BenchmarkExecution() - - // Define Benchmark SLOs - val slo = KubernetesBenchmark.Slo() - slo.name="test" - slo.sloType="lag trend" - slo.prometheusUrl="test.de" - slo.offset=0 - - val benchmarkSloProperties = mutableMapOf<String, String>() - benchmarkSloProperties["threshold"] = "2000" - benchmarkSloProperties["externalSloUrl"] = "http://localhost:80/evaluate-slope" - benchmarkSloProperties["warmup"] = "60" - - slo.properties=benchmarkSloProperties - - benchmark.slos = mutableListOf(slo) - - - // Define Execution SLOs, benchmark SLO values for these properties should be overwritten - val sloConfig = BenchmarkExecution.SloConfiguration() - sloConfig.name = "test" - - val executionSloProperties = mutableMapOf<String, String>() - // overwriting properties 'threshold' and 'warmup' and adding property 'extensionTest' - executionSloProperties["threshold"] = "3000" - executionSloProperties["warmup"] = "80" - executionSloProperties["extensionTest"] = "extended" - - sloConfig.properties = executionSloProperties - - // SLO has 'name' that isn't defined in the benchmark, therefore it will be ignored by the SloFactory - val sloConfig2 = BenchmarkExecution.SloConfiguration() - sloConfig2.name = "test2" - sloConfig2.properties = executionSloProperties - - execution.slos = listOf(sloConfig, sloConfig2) - - val sloFactory = SloFactory() - val combinedSlos = sloFactory.createSlos(execution,benchmark) + val execution = BenchmarkExecution().also { exec -> + exec.slos = listOf( + // SLOs, which should override benchmark SLO values for these properties + BenchmarkExecution.SloConfiguration(). also { + it.name = "test" + it.properties = mutableMapOf( + // overwriting properties 'threshold' and 'warmup' and adding property 'extensionTest' + "threshold" to "3000", + "warmup" to "80", + "extensionTest" to "extended" + ) + }, + // SLO with name that isn't defined in the benchmark, therefore it should be ignored by the SloFactory + BenchmarkExecution.SloConfiguration().also { + it.name = "test2" + it.properties = mutableMapOf() // No properties + } + ) + } + + val combinedSlos = this.sloFactory.createSlos(execution, this.benchmark) Assertions.assertEquals(1, combinedSlos.size) Assertions.assertEquals("test", combinedSlos[0].name) @@ -66,4 +63,5 @@ internal class SloFactoryTest { Assertions.assertEquals("80", combinedSlos[0].properties["warmup"]) Assertions.assertEquals("extended", combinedSlos[0].properties["extensionTest"]) } + } \ No newline at end of file