Skip to content
Snippets Groups Projects
Commit 0dfca34d authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite

parents 1bf07839 851f4bd7
No related branches found
No related tags found
No related merge requests found
Pipeline #6044 canceled
Showing
with 1786 additions and 17 deletions
...@@ -143,6 +143,7 @@ test-benchmarks: ...@@ -143,6 +143,7 @@ test-benchmarks:
- build-benchmarks - build-benchmarks
script: ./gradlew test --continue script: ./gradlew test --continue
artifacts: artifacts:
when: always
reports: reports:
junit: junit:
- "theodolite-benchmarks/**/build/test-results/test/TEST-*.xml" - "theodolite-benchmarks/**/build/test-results/test/TEST-*.xml"
...@@ -204,10 +205,8 @@ spotbugs-benchmarks: ...@@ -204,10 +205,8 @@ spotbugs-benchmarks:
- changes: - changes:
- theodolite-benchmarks/* - theodolite-benchmarks/*
- theodolite-benchmarks/$JAVA_PROJECT_NAME/**/* - theodolite-benchmarks/$JAVA_PROJECT_NAME/**/*
- theodolite-benchmarks/kstreams-commons/**/* - theodolite-benchmarks/{$JAVA_PROJECT_DEPS}/**/*
- theodolite-benchmarks/flink-commons/**/* if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME && $JAVA_PROJECT_DEPS"
- theodolite-benchmarks/load-generator-commons/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME" - if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW && $IMAGE_NAME && $JAVA_PROJECT_NAME"
when: manual when: manual
allow_failure: true allow_failure: true
...@@ -217,72 +216,140 @@ deploy-uc1-kstreams: ...@@ -217,72 +216,140 @@ deploy-uc1-kstreams:
variables: variables:
IMAGE_NAME: "theodolite-uc1-kstreams-app" IMAGE_NAME: "theodolite-uc1-kstreams-app"
JAVA_PROJECT_NAME: "uc1-kstreams" JAVA_PROJECT_NAME: "uc1-kstreams"
JAVA_PROJECT_DEPS: "kstreams-commons"
deploy-uc2-kstreams: deploy-uc2-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc2-kstreams-app" IMAGE_NAME: "theodolite-uc2-kstreams-app"
JAVA_PROJECT_NAME: "uc2-kstreams" JAVA_PROJECT_NAME: "uc2-kstreams"
JAVA_PROJECT_DEPS: "kstreams-commons"
deploy-uc3-kstreams: deploy-uc3-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc3-kstreams-app" IMAGE_NAME: "theodolite-uc3-kstreams-app"
JAVA_PROJECT_NAME: "uc3-kstreams" JAVA_PROJECT_NAME: "uc3-kstreams"
JAVA_PROJECT_DEPS: "kstreams-commons"
deploy-uc4-kstreams: deploy-uc4-kstreams:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc4-kstreams-app" IMAGE_NAME: "theodolite-uc4-kstreams-app"
JAVA_PROJECT_NAME: "uc4-kstreams" JAVA_PROJECT_NAME: "uc4-kstreams"
JAVA_PROJECT_DEPS: "kstreams-commons"
deploy-uc1-flink: deploy-uc1-flink:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc1-flink" IMAGE_NAME: "theodolite-uc1-flink"
JAVA_PROJECT_NAME: "uc1-flink" JAVA_PROJECT_NAME: "uc1-flink"
JAVA_PROJECT_DEPS: "flink-commons"
deploy-uc2-flink: deploy-uc2-flink:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc2-flink" IMAGE_NAME: "theodolite-uc2-flink"
JAVA_PROJECT_NAME: "uc2-flink" JAVA_PROJECT_NAME: "uc2-flink"
JAVA_PROJECT_DEPS: "flink-commons"
deploy-uc3-flink: deploy-uc3-flink:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc3-flink" IMAGE_NAME: "theodolite-uc3-flink"
JAVA_PROJECT_NAME: "uc3-flink" JAVA_PROJECT_NAME: "uc3-flink"
JAVA_PROJECT_DEPS: "flink-commons"
deploy-uc4-flink: deploy-uc4-flink:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc4-flink" IMAGE_NAME: "theodolite-uc4-flink"
JAVA_PROJECT_NAME: "uc4-flink" JAVA_PROJECT_NAME: "uc4-flink"
JAVA_PROJECT_DEPS: "flink-commons"
deploy-uc1-beam-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc1-beam-flink"
JAVA_PROJECT_NAME: "uc1-beam-flink"
JAVA_PROJECT_DEPS: "beam-commons,uc1-beam"
deploy-uc2-beam-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc2-beam-flink"
JAVA_PROJECT_NAME: "uc2-beam-flink"
JAVA_PROJECT_DEPS: "beam-commons,uc2-beam"
deploy-uc3-beam-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc3-beam-flink"
JAVA_PROJECT_NAME: "uc3-beam-flink"
JAVA_PROJECT_DEPS: "beam-commons,uc3-beam"
deploy-uc4-beam-flink:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc4-beam-flink"
JAVA_PROJECT_NAME: "uc4-beam-flink"
JAVA_PROJECT_DEPS: "beam-commons,uc4-beam"
deploy-uc1-beam-samza:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc1-beam-samza"
JAVA_PROJECT_NAME: "uc1-beam-samza"
JAVA_PROJECT_DEPS: "beam-commons,uc1-beam"
deploy-uc2-beam-samza:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc2-beam-samza"
JAVA_PROJECT_NAME: "uc2-beam-samza"
JAVA_PROJECT_DEPS: "beam-commons,uc2-beam"
deploy-uc3-beam-samza:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc3-beam-samza"
JAVA_PROJECT_NAME: "uc3-beam-samza"
JAVA_PROJECT_DEPS: "beam-commons,uc3-beam"
deploy-uc4-beam-samza:
extends: .deploy-benchmarks
variables:
IMAGE_NAME: "theodolite-uc4-beam-samza"
JAVA_PROJECT_NAME: "uc4-beam-samza"
JAVA_PROJECT_DEPS: "beam-commons,uc4-beam"
deploy-uc1-load-generator: deploy-uc1-load-generator:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc1-workload-generator" IMAGE_NAME: "theodolite-uc1-workload-generator"
JAVA_PROJECT_NAME: "uc1-load-generator" JAVA_PROJECT_NAME: "uc1-load-generator"
JAVA_PROJECT_DEPS: "load-generator-commons"
deploy-uc2-load-generator: deploy-uc2-load-generator:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc2-workload-generator" IMAGE_NAME: "theodolite-uc2-workload-generator"
JAVA_PROJECT_NAME: "uc2-load-generator" JAVA_PROJECT_NAME: "uc2-load-generator"
JAVA_PROJECT_DEPS: "load-generator-commons"
deploy-uc3-load-generator: deploy-uc3-load-generator:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc3-workload-generator" IMAGE_NAME: "theodolite-uc3-workload-generator"
JAVA_PROJECT_NAME: "uc3-load-generator" JAVA_PROJECT_NAME: "uc3-load-generator"
JAVA_PROJECT_DEPS: "load-generator-commons"
deploy-uc4-load-generator: deploy-uc4-load-generator:
extends: .deploy-benchmarks extends: .deploy-benchmarks
variables: variables:
IMAGE_NAME: "theodolite-uc4-workload-generator" IMAGE_NAME: "theodolite-uc4-workload-generator"
JAVA_PROJECT_NAME: "uc4-load-generator" JAVA_PROJECT_NAME: "uc4-load-generator"
JAVA_PROJECT_DEPS: "load-generator-commons"
# Theodolite Framework # Theodolite Framework
...@@ -330,6 +397,7 @@ test-theodolite: ...@@ -330,6 +397,7 @@ test-theodolite:
#- build-theodolite-native #- build-theodolite-native
script: ./gradlew test --stacktrace script: ./gradlew test --stacktrace
artifacts: artifacts:
when: always
reports: reports:
junit: junit:
- "theodolite/**/build/test-results/test/TEST-*.xml" - "theodolite/**/build/test-results/test/TEST-*.xml"
...@@ -408,6 +476,22 @@ test-slo-checker-dropped-records-kstreams: ...@@ -408,6 +476,22 @@ test-slo-checker-dropped-records-kstreams:
- when: manual - when: manual
allow_failure: true allow_failure: true
test-slo-checker-generic:
stage: test
needs: []
image: python:3.7-slim
before_script:
- cd slo-checker/generic
script:
- pip install -r requirements.txt
- cd app
- python -m unittest
rules:
- changes:
- slo-checker/generic/**/*
- when: manual
allow_failure: true
deploy-slo-checker-lag-trend: deploy-slo-checker-lag-trend:
stage: deploy stage: deploy
extends: extends:
...@@ -444,6 +528,24 @@ deploy-slo-checker-dropped-records-kstreams: ...@@ -444,6 +528,24 @@ deploy-slo-checker-dropped-records-kstreams:
when: manual when: manual
allow_failure: true allow_failure: true
deploy-slo-checker-generic:
stage: deploy
extends:
- .kaniko-push
needs:
- test-slo-checker-generic
before_script:
- cd slo-checker/generic
variables:
IMAGE_NAME: theodolite-slo-checker-generic
rules:
- changes:
- slo-checker/generic/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
# Theodolite Random Scheduler # Theodolite Random Scheduler
......
../analysis/requirements.txt
\ No newline at end of file
...@@ -4,7 +4,6 @@ description: >- ...@@ -4,7 +4,6 @@ description: >-
scalability of cloud-native applications. scalability of cloud-native applications.
remote_theme: pmarsceill/just-the-docs remote_theme: pmarsceill/just-the-docs
#color_scheme: "dark"
aux_links: aux_links:
"Theodolite on GitHub": "Theodolite on GitHub":
- "//github.com/cau-se/theodolite" - "//github.com/cau-se/theodolite"
...@@ -14,4 +13,5 @@ exclude: ...@@ -14,4 +13,5 @@ exclude:
- Gemfile - Gemfile
- Gemfile.lock - Gemfile.lock
- README.md - README.md
- vendor - vendor/
- drafts/
This diff is collapsed.
...@@ -58,7 +58,29 @@ As a Benchmark may define multiple supported load and resource types, an Executi ...@@ -58,7 +58,29 @@ As a Benchmark may define multiple supported load and resource types, an Executi
## Definition of SLOs ## Definition of SLOs
SLOs provide a way to quantify whether a certain load intensity can be handled by a certain amount of provisioned resources. SLOs provide a way to quantify whether a certain load intensity can be handled by a certain amount of provisioned resources.
An Execution must at least specify one SLO to be checked. In Theodolite, SLO are evaluated by requesting monitoring data from Prometheus and analyzing it in a benchmark-specific way.
An Execution must at least define one SLO to be checked.
A good choice to get started is defining an SLO of type `generic`:
```yaml
- sloType: "generic"
prometheusUrl: "http://prometheus-operated:9090"
offset: 0
properties:
externalSloUrl: "http://localhost:8082"
promQLQuery: "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
warmup: 60 # in seconds
queryAggregation: max
repetitionAggregation: median
operator: lte
threshold: 1000
```
All you have to do is to define a [PromQL query](https://prometheus.io/docs/prometheus/latest/querying/basics/) describing which metrics should be requested (`promQLQuery`) and how the resulting time series should be evaluated. With `queryAggregation` you specify how the resulting time series is aggregated to a single value and `repetitionAggregation` describes how the results of multiple repetitions are aggregated. Possible values are
`mean`, `median`, `mode`, `sum`, `count`, `max`, `min`, `std`, `var`, `skew`, `kurt` as well as percentiles such as `p99` or `p99.9`. The result of aggregation all repetitions is checked against `threshold`. This check is performed using an `operator`, which describes that the result must be "less than" (`lt`), "less than equal" (`lte`), "greater than" (`gt`) or "greater than equal" (`gte`) to the threshold.
In case you need to evaluate monitoring data in a more flexible fashion, you can also change the value of `externalSloUrl` to your custom SLO checker. Have a look at the source code of the [generic SLO checker](https://github.com/cau-se/theodolite/tree/master/slo-checker/generic) to get started.
## Experimental Setup ## Experimental Setup
...@@ -72,7 +94,7 @@ The experimental setup can be configured by: ...@@ -72,7 +94,7 @@ The experimental setup can be configured by:
## Configuration Overrides ## Configuration Overrides
In cases where only small modifications of a system under test should be benchmarked, it is not necessarily required to [create a new benchmark](creating-a-benchmark). In cases where only small modifications of a system under test should be benchmarked, it is not necessary to [create a new benchmark](creating-a-benchmark).
Instead, also Executions allow to do small reconfigurations, such as switching on or off a specific Pod scheduler. Instead, also Executions allow to do small reconfigurations, such as switching on or off a specific Pod scheduler.
This is done by defining `configOverrides` in the Execution. Each override consists of a patcher, defining which Kubernetes resource should be patched in which way, and a value the patcher is applied with. This is done by defining `configOverrides` in the Execution. Each override consists of a patcher, defining which Kubernetes resource should be patched in which way, and a value the patcher is applied with.
......
## Infrastructure
The necessary infrastructure for an execution can be defined in the benchmark manifests. The related resources are create *before* an execution is started, and removed *after* an execution is finished.
### Example
```yaml
infrastructure:
resources:
- configMap:
name: "example-configmap"
files:
- "uc1-kstreams-deployment.yaml"
```
## Action Commands
Theodolite allows to execute commands on running pods (similar to the `kubectl exec -it <pod-name> -- <command>` command). This commands can be run either before (via so called `beforeActions`) or after (via so called `afterActions`) an experiment is executed.
Theodolite checks if all required pods are available for the specified actions (i.e. the pods must either be defined as infrastructure or already deployed in the cluster). If not all pods/resources are available, the benchmark will not be set as `Ready`. Consequently, an action cannot be executed on a pod that is defined as an SUT or loadGen resource.
### Example
```yaml
# For the system under test
sut:
resources: ...
beforeActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: ["touch", "test-file-sut"]
timeoutSeconds: 90
afterActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: [ "touch", "test-file-sut-after" ]
timeoutSeconds: 90
# analog, for the load generator
loadGenerator:
resources: ...
beforeActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: ["touch", "test-file-loadGen"]
timeoutSeconds: 90
afterActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: [ "touch", "test-file-loadGen-after" ]
timeoutSeconds: 90
```
\ No newline at end of file
...@@ -11,6 +11,7 @@ Running scalability benchmarks with Theodolite involves the following steps: ...@@ -11,6 +11,7 @@ Running scalability benchmarks with Theodolite involves the following steps:
1. [Deploying a benchmark to Kubernetes](#deploying-a-benchmark) 1. [Deploying a benchmark to Kubernetes](#deploying-a-benchmark)
1. [Creating an execution](#creating-an-execution), which describes the experimental setup for running the benchmark 1. [Creating an execution](#creating-an-execution), which describes the experimental setup for running the benchmark
1. [Accessing benchmark results](#accessing-benchmark-results) 1. [Accessing benchmark results](#accessing-benchmark-results)
1. [Analyzing benchmark results](#analyzing-benchmark-results) with Theodolite's Jupyter notebooks
## Deploying a Benchmark ## Deploying a Benchmark
...@@ -131,3 +132,32 @@ For installations without persistence, but also as an alternative for installati ...@@ -131,3 +132,32 @@ For installations without persistence, but also as an alternative for installati
```sh ```sh
kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):/results . -c results-access kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):/results . -c results-access
``` ```
## Analyzing Benchmark Results
Theodolite comes with Jupyter notebooks for analyzing and visualizing benchmark execution results.
The easiest way to use them is at MyBinder:
[Launch Notebooks](https://mybinder.org/v2/gh/cau-se/theodolite/HEAD?labpath=analysis){: .btn .btn-primary }
{: .text-center }
Alternatively, you can also [run these notebook locally](https://github.com/cau-se/theodolite/tree/master/analysis), for example, with Docker or Visual Studio Code.
The notebooks allow to compute a scalability function using its *demand* metric and to visualize multiple such functions in plots:
### Computing the *demand* metric with `demand-metric.ipynb` (optional)
After finishing a benchmark execution, Theodolite creates a `exp<id>_demand.csv` file. It maps the tested load intensities to the minimal required resources for that load. If the monitoring data collected during benchmark execution should be analyzed in more detail, the `demand-metric.ipynb` notebook can be used.
Theodolite stores monitoring data for each conducted SLO experiment in `exp<id>_<load>_<resources>_<slo-slug>_<rep>.csv` files, where `<id>` is the ID of an execution, `<load>` the corresponding load intensity value, `<resources>` the resources value, `<slo-slug>` the [name of the SLO](creating-an-execution.html#definition-of-slos) and `<rep>` the repetition counter.
The `demand-metric.ipynb` notebook reads these files and generates a new CSV file mapping load intensities to the minimal required resources. The format of this file corresponds to the original `exp<id>_demand.csv` file created when running the benchmark, but allows, for example, to evaluate different warm-up periods.
Currently, the `demand-metric.ipynb` notebook only supports benchmarks with the *lag trend SLO* out-of-the-box, but can easily be adjusted to perform any other type of analysis.
### Plotting benchmark results with the *demand* metric with `demand-metric-plot.ipynb`
The `demand-metric-plot.ipynb` takes one or multiple `exp<id>_demand.csv` files as input and visualize them together in a plot.
Input files can either be taken directly from Theodolite, or created from the `demand-metric.ipynb` notebooks.
All plotting code is only intended to serve as a template. Adjust it as needed to change colors, labels, formatting, etc. as needed.
Please refer to the official docs of [MatPlotLib](https://matplotlib.org/) and the [ggplot](https://matplotlib.org/stable/gallery/style_sheets/ggplot.html) style, which are used to generate the plots.
...@@ -8,5 +8,8 @@ cp-helm-charts: ...@@ -8,5 +8,8 @@ cp-helm-charts:
offsets.topic.replication.factor: "1" offsets.topic.replication.factor: "1"
operator: operator:
sloChecker:
droppedRecordsKStreams:
enabled: false
resultsVolume: resultsVolume:
enabled: false enabled: false
...@@ -31,6 +31,19 @@ spec: ...@@ -31,6 +31,19 @@ spec:
volumeMounts: volumeMounts:
- name: theodolite-results-volume - name: theodolite-results-volume
mountPath: "/deployments/results" mountPath: "/deployments/results"
{{- if .Values.operator.sloChecker.droppedRecordsKStreams.enabled }}
- name: slo-checker-generic
image: "{{ .Values.operator.sloChecker.generic.image }}:{{ .Values.operator.sloChecker.generic.imageTag }}"
imagePullPolicy: "{{ .Values.operator.sloChecker.generic.imagePullPolicy }}"
ports:
- containerPort: 8082
name: analysis
env:
- name: PORT
value: "8082"
- name: LOG_LEVEL
value: INFO
{{- end }}
{{- if .Values.operator.sloChecker.lagTrend.enabled }} {{- if .Values.operator.sloChecker.lagTrend.enabled }}
- name: lag-trend-slo-checker - name: lag-trend-slo-checker
image: "{{ .Values.operator.sloChecker.lagTrend.image }}:{{ .Values.operator.sloChecker.lagTrend.imageTag }}" image: "{{ .Values.operator.sloChecker.lagTrend.image }}:{{ .Values.operator.sloChecker.lagTrend.imageTag }}"
......
...@@ -256,6 +256,11 @@ operator: ...@@ -256,6 +256,11 @@ operator:
nodeSelector: {} nodeSelector: {}
sloChecker: sloChecker:
generic:
enabled: true
image: ghcr.io/cau-se/theodolite-slo-checker-generic
imageTag: latest
imagePullPolicy: Always
lagTrend: lagTrend:
enabled: true enabled: true
image: ghcr.io/cau-se/theodolite-slo-checker-lag-trend image: ghcr.io/cau-se/theodolite-slo-checker-lag-trend
......
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY ./app /app
\ No newline at end of file
# Generic SLO Evaluator
## Execution
For development:
```sh
uvicorn main:app --reload
```
## Build the docker image:
```sh
docker build . -t theodolite-evaluator
```
Run the Docker image:
```sh
docker run -p 80:80 theodolite-evaluator
```
## Configuration
You can set the `HOST` and the `PORT` (and a lot of more parameters) via environment variables. Default is `0.0.0.0:80`.
For more information see the [Gunicorn/FastAPI Docker docs](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker#advanced-usage).
## API Documentation
The running webserver provides a REST API with the following route:
* /
* Method: POST
* Body:
* results
* metric-metadata
* values
* metadata
* warmup
* queryAggregation
* repetitionAggregation
* operator
* threshold
The body of the request must be a JSON string that satisfies the following conditions:
* **dropped records**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON *structure*:
```json
{
"results": [
[
{
"metric": {
"<label-name>": "<label-value>"
},
"values": [
[
<unix_timestamp>, // 1.634624989695E9
"<sample_value>" // integer
]
]
}
]
],
"metadata": {
"warmup": 60,
"queryAggregation": "max",
"repetitionAggregation": "median",
"operator": "lt",
"threshold": 2000000
}
}
```
### description
* results:
* metric-metadata:
* Labels of this metric. The `generic` slo checker does not use labels in the calculation of the service level objective.
* results
* The `<unix_timestamp>` provided as the first element of each element in the "values" array must be the timestamp of the measurement value in seconds (with optional decimal precision)
* The `<sample_value>` must be the measurement value as string.
* metadata: For the calculation of the service level objective require metadata.
* **warmup**: Specifies the warmup time in seconds that are ignored for evaluating the SLO.
* **queryAggregation**: Specifies the function used to aggregate a query.
* **repetitionAggregation**: Specifies the function used to aggregate a the results of multiple query aggregations.
* **operator**: Specifies how the result should be checked agains a threshold. Possible values are `lt`, `lte`, `gt` and `gte`.
* **threshold**: Must be an unsigned integer that specifies the threshold for the SLO evaluation.
from fastapi import FastAPI,Request
import logging
import os
import json
import sys
import re
import pandas as pd
app = FastAPI()
logging.basicConfig(stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("API")
if os.getenv('LOG_LEVEL') == 'INFO':
logger.setLevel(logging.INFO)
elif os.getenv('LOG_LEVEL') == 'WARNING':
logger.setLevel(logging.WARNING)
elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel(logging.DEBUG)
def get_aggr_func(func_string: str):
if func_string in ['mean', 'median', 'mode', 'sum', 'count', 'max', 'min', 'std', 'var', 'skew', 'kurt']:
return func_string
elif re.search(r'^p\d\d?(\.\d+)?$', func_string): # matches strings like 'p99', 'p99.99', 'p1', 'p0.001'
def percentile(x):
return x.quantile(float(func_string[1:]) / 100)
percentile.__name__ = func_string
return percentile
else:
raise ValueError('Invalid function string.')
def aggr_query(values: dict, warmup: int, aggr_func):
df = pd.DataFrame.from_dict(values)
df.columns = ['timestamp', 'value']
filtered = df[df['timestamp'] >= (df['timestamp'][0] + warmup)]
filtered['value'] = filtered['value'].astype(int)
return filtered['value'].aggregate(aggr_func)
def check_result(result, operator: str, threshold):
if operator == 'lt':
return result < threshold
if operator == 'lte':
return result <= threshold
if operator == 'gt':
return result > threshold
if operator == 'gte':
return result >= threshold
else:
raise ValueError('Invalid operator string.')
@app.post("/",response_model=bool)
async def check_slo(request: Request):
data = json.loads(await request.body())
logger.info('Received request with metadata: %s', data['metadata'])
warmup = int(data['metadata']['warmup'])
query_aggregation = get_aggr_func(data['metadata']['queryAggregation'])
rep_aggregation = get_aggr_func(data['metadata']['repetitionAggregation'])
operator = data['metadata']['operator']
threshold = int(data['metadata']['threshold'])
query_results = [aggr_query(r[0]["values"], warmup, query_aggregation) for r in data["results"]]
result = pd.DataFrame(query_results).aggregate(rep_aggregation).at[0]
return check_result(result, operator, threshold)
logger.info("SLO evaluator is online")
\ No newline at end of file
import unittest
from main import app, get_aggr_func, check_result
import json
from fastapi.testclient import TestClient
class TestSloEvaluation(unittest.TestCase):
client = TestClient(app)
def test_1_rep(self):
with open('../resources/test-1-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/", json=data)
self.assertEqual(response.json(), True)
def test_get_aggr_func_mean(self):
self.assertEqual(get_aggr_func('median'), 'median')
def test_get_aggr_func_p99(self):
self.assertTrue(callable(get_aggr_func('p99')))
def test_get_aggr_func_p99_9(self):
self.assertTrue(callable(get_aggr_func('p99.9')))
def test_get_aggr_func_p99_99(self):
self.assertTrue(callable(get_aggr_func('p99.99')))
def test_get_aggr_func_p0_1(self):
self.assertTrue(callable(get_aggr_func('p0.1')))
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'p99.')
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'q99')
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'mux')
def test_check_result_lt(self):
self.assertEqual(check_result(100, 'lt', 200), True)
def test_check_result_lte(self):
self.assertEqual(check_result(200, 'lte', 200), True)
def test_check_result_gt(self):
self.assertEqual(check_result(100, 'gt', 200), False)
def test_check_result_gte(self):
self.assertEqual(check_result(300, 'gte', 200), True)
def test_check_result_invalid(self):
self.assertRaises(ValueError, check_result, 100, 'xyz', 200)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
fastapi==0.65.2
pandas==1.0.3
uvicorn
requests
{
"results": [
[
{
"metric": {
"job": "titan-ccp-aggregation"
},
"values": [
[
1.634624674695E9,
"0"
],
[
1.634624679695E9,
"0"
],
[
1.634624684695E9,
"0"
],
[
1.634624689695E9,
"0"
],
[
1.634624694695E9,
"0"
],
[
1.634624699695E9,
"0"
],
[
1.634624704695E9,
"0"
],
[
1.634624709695E9,
"0"
],
[
1.634624714695E9,
"0"
],
[
1.634624719695E9,
"0"
],
[
1.634624724695E9,
"0"
],
[
1.634624729695E9,
"0"
],
[
1.634624734695E9,
"0"
],
[
1.634624739695E9,
"0"
],
[
1.634624744695E9,
"1"
],
[
1.634624749695E9,
"3"
],
[
1.634624754695E9,
"4"
],
[
1.634624759695E9,
"4"
],
[
1.634624764695E9,
"4"
],
[
1.634624769695E9,
"4"
],
[
1.634624774695E9,
"4"
],
[
1.634624779695E9,
"4"
],
[
1.634624784695E9,
"4"
],
[
1.634624789695E9,
"4"
],
[
1.634624794695E9,
"4"
],
[
1.634624799695E9,
"4"
],
[
1.634624804695E9,
"176"
],
[
1.634624809695E9,
"176"
],
[
1.634624814695E9,
"176"
],
[
1.634624819695E9,
"176"
],
[
1.634624824695E9,
"176"
],
[
1.634624829695E9,
"159524"
],
[
1.634624834695E9,
"209870"
],
[
1.634624839695E9,
"278597"
],
[
1.634624844695E9,
"460761"
],
[
1.634624849695E9,
"460761"
],
[
1.634624854695E9,
"460761"
],
[
1.634624859695E9,
"460761"
],
[
1.634624864695E9,
"460761"
],
[
1.634624869695E9,
"606893"
],
[
1.634624874695E9,
"653534"
],
[
1.634624879695E9,
"755796"
],
[
1.634624884695E9,
"919317"
],
[
1.634624889695E9,
"919317"
],
[
1.634624894695E9,
"955926"
],
[
1.634624899695E9,
"955926"
],
[
1.634624904695E9,
"955926"
],
[
1.634624909695E9,
"955926"
],
[
1.634624914695E9,
"955926"
],
[
1.634624919695E9,
"1036530"
],
[
1.634624924695E9,
"1078477"
],
[
1.634624929695E9,
"1194775"
],
[
1.634624934695E9,
"1347755"
],
[
1.634624939695E9,
"1352151"
],
[
1.634624944695E9,
"1360428"
],
[
1.634624949695E9,
"1360428"
],
[
1.634624954695E9,
"1360428"
],
[
1.634624959695E9,
"1360428"
],
[
1.634624964695E9,
"1360428"
],
[
1.634624969695E9,
"1525685"
],
[
1.634624974695E9,
"1689296"
],
[
1.634624979695E9,
"1771358"
],
[
1.634624984695E9,
"1854284"
],
[
1.634624989695E9,
"1854284"
]
]
}
]
],
"metadata": {
"warmup": 60,
"queryAggregation": "max",
"repetitionAggregation": "median",
"operator": "lt",
"threshold": 2000000
}
}
\ No newline at end of file
plugins {
id 'theodolite.java-commons'
}
repositories {
mavenCentral()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
}
}
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
runtimeOnly 'org.slf4j:slf4j-api:1.7.32'
runtimeOnly 'org.slf4j:slf4j-jdk14:1.7.32'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
package theodolite.commons.beam;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a Beam microservice.
* Encapsulates the corresponding {@link PipelineOptions} and the beam Runner.
*/
public class AbstractBeamService {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class);
// Beam Pipeline
protected PipelineOptions options;
// Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName =
config.getString(ConfigurationKeys.APPLICATION_NAME);
/**
* Creates AbstractBeamService with options.
*/
public AbstractBeamService(final String[] args) { //NOPMD
super();
LOGGER.info("Pipeline options:");
for (final String s : args) {
LOGGER.info("{}", s);
}
options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(applicationName);
LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString());
}
public Configuration getConfig() {
return config;
}
}
package theodolite.commons.beam;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
/**
* Abstraction of a Beam {@link Pipeline}.
*/
public class AbstractPipeline extends Pipeline {
protected final String inputTopic;
protected final String bootstrapServer;
// Application Configurations
private final Configuration config;
protected AbstractPipeline(final PipelineOptions options, final Configuration config) {
super(options);
this.config = config;
inputTopic = config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
}
/**
* Builds a simple configuration for a Kafka consumer transformation.
*
* @return the build configuration.
*/
public Map<String, Object> buildConsumerConfig() {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put("schema.registry.url",
config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
consumerConfig.put("specific.avro.reader",
config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER));
final String applicationName = config.getString(ConfigurationKeys.APPLICATION_NAME);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationName);
return consumerConfig;
}
}
package theodolite.commons.beam;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
// Common keys
public static final String APPLICATION_NAME = "application.name";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// BEAM
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config";
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval";
private ConfigurationKeys() {
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment