Skip to content
Snippets Groups Projects
Commit 37da0555 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'generic-slo' into 'master'

Add generic SLO checker

Closes #315

See merge request !224
parents 435832da 22fcf15a
No related branches found
No related tags found
1 merge request!224Add generic SLO checker
Pipeline #6003 passed
...@@ -474,6 +474,22 @@ test-slo-checker-dropped-records-kstreams: ...@@ -474,6 +474,22 @@ test-slo-checker-dropped-records-kstreams:
- when: manual - when: manual
allow_failure: true allow_failure: true
test-slo-checker-generic:
stage: test
needs: []
image: python:3.7-slim
before_script:
- cd slo-checker/generic
script:
- pip install -r requirements.txt
- cd app
- python -m unittest
rules:
- changes:
- slo-checker/generic/**/*
- when: manual
allow_failure: true
deploy-slo-checker-lag-trend: deploy-slo-checker-lag-trend:
stage: deploy stage: deploy
extends: extends:
...@@ -510,6 +526,24 @@ deploy-slo-checker-dropped-records-kstreams: ...@@ -510,6 +526,24 @@ deploy-slo-checker-dropped-records-kstreams:
when: manual when: manual
allow_failure: true allow_failure: true
deploy-slo-checker-generic:
stage: deploy
extends:
- .kaniko-push
needs:
- test-slo-checker-generic
before_script:
- cd slo-checker/generic
variables:
IMAGE_NAME: theodolite-slo-checker-generic
rules:
- changes:
- slo-checker/generic/**/*
if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
- if: "$CR_HOST && $CR_ORG && $CR_USER && $CR_PW"
when: manual
allow_failure: true
# Theodolite Random Scheduler # Theodolite Random Scheduler
......
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY ./app /app
\ No newline at end of file
# Generic SLO Evaluator
## Execution
For development:
```sh
uvicorn main:app --reload
```
## Build the docker image:
```sh
docker build . -t theodolite-evaluator
```
Run the Docker image:
```sh
docker run -p 80:80 theodolite-evaluator
```
## Configuration
You can set the `HOST` and the `PORT` (and a lot of more parameters) via environment variables. Default is `0.0.0.0:80`.
For more information see the [Gunicorn/FastAPI Docker docs](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker#advanced-usage).
## API Documentation
The running webserver provides a REST API with the following route:
* /
* Method: POST
* Body:
* results
* metric-metadata
* values
* metadata
* warmup
* queryAggregation
* repetitionAggregation
* operator
* threshold
The body of the request must be a JSON string that satisfies the following conditions:
* **dropped records**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON *structure*:
```json
{
"results": [
[
{
"metric": {
"<label-name>": "<label-value>"
},
"values": [
[
<unix_timestamp>, // 1.634624989695E9
"<sample_value>" // integer
]
]
}
]
],
"metadata": {
"warmup": 60,
"queryAggregation": "max",
"repetitionAggregation": "median",
"operator": "lt",
"threshold": 2000000
}
}
```
### description
* results:
* metric-metadata:
* Labels of this metric. The `generic` slo checker does not use labels in the calculation of the service level objective.
* results
* The `<unix_timestamp>` provided as the first element of each element in the "values" array must be the timestamp of the measurement value in seconds (with optional decimal precision)
* The `<sample_value>` must be the measurement value as string.
* metadata: For the calculation of the service level objective require metadata.
* **warmup**: Specifies the warmup time in seconds that are ignored for evaluating the SLO.
* **queryAggregation**: Specifies the function used to aggregate a query.
* **repetitionAggregation**: Specifies the function used to aggregate a the results of multiple query aggregations.
* **operator**: Specifies how the result should be checked agains a threshold. Possible values are `lt`, `lte`, `gt` and `gte`.
* **threshold**: Must be an unsigned integer that specifies the threshold for the SLO evaluation.
from fastapi import FastAPI,Request
import logging
import os
import json
import sys
import re
import pandas as pd
app = FastAPI()
logging.basicConfig(stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("API")
if os.getenv('LOG_LEVEL') == 'INFO':
logger.setLevel(logging.INFO)
elif os.getenv('LOG_LEVEL') == 'WARNING':
logger.setLevel(logging.WARNING)
elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel(logging.DEBUG)
def get_aggr_func(func_string: str):
if func_string in ['mean', 'median', 'mode', 'sum', 'count', 'max', 'min', 'std', 'var', 'skew', 'kurt']:
return func_string
elif re.search(r'^p\d\d?(\.\d+)?$', func_string): # matches strings like 'p99', 'p99.99', 'p1', 'p0.001'
def percentile(x):
return x.quantile(float(func_string[1:]) / 100)
percentile.__name__ = func_string
return percentile
else:
raise ValueError('Invalid function string.')
def aggr_query(values: dict, warmup: int, aggr_func):
df = pd.DataFrame.from_dict(values)
df.columns = ['timestamp', 'value']
filtered = df[df['timestamp'] >= (df['timestamp'][0] + warmup)]
filtered['value'] = filtered['value'].astype(int)
return filtered['value'].aggregate(aggr_func)
def check_result(result, operator: str, threshold):
if operator == 'lt':
return result < threshold
if operator == 'lte':
return result <= threshold
if operator == 'gt':
return result > threshold
if operator == 'gte':
return result >= threshold
else:
raise ValueError('Invalid operator string.')
@app.post("/",response_model=bool)
async def check_slo(request: Request):
data = json.loads(await request.body())
warmup = int(data['metadata']['warmup'])
query_aggregation = get_aggr_func(data['metadata']['queryAggregation'])
rep_aggregation = get_aggr_func(data['metadata']['repetitionAggregation'])
operator = data['metadata']['operator']
threshold = int(data['metadata']['threshold'])
for r in data["results"]:
aggr_query(r[0]["values"], warmup, query_aggregation)
query_results = [aggr_query(r[0]["values"], warmup, query_aggregation) for r in data["results"]]
result = pd.DataFrame(query_results).aggregate(rep_aggregation).at[0]
return check_result(result, operator, threshold)
logger.info("SLO evaluator is online")
\ No newline at end of file
import unittest
from main import app, get_aggr_func, check_result
import json
from fastapi.testclient import TestClient
class TestSloEvaluation(unittest.TestCase):
client = TestClient(app)
def test_1_rep(self):
with open('../resources/test-1-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/", json=data)
self.assertEqual(response.json(), True)
def test_get_aggr_func_mean(self):
self.assertEqual(get_aggr_func('median'), 'median')
def test_get_aggr_func_p99(self):
self.assertTrue(callable(get_aggr_func('p99')))
def test_get_aggr_func_p99_9(self):
self.assertTrue(callable(get_aggr_func('p99.9')))
def test_get_aggr_func_p99_99(self):
self.assertTrue(callable(get_aggr_func('p99.99')))
def test_get_aggr_func_p0_1(self):
self.assertTrue(callable(get_aggr_func('p0.1')))
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'p99.')
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'q99')
def test_get_aggr_func_p99_(self):
self.assertRaises(ValueError, get_aggr_func, 'mux')
def test_check_result_lt(self):
self.assertEqual(check_result(100, 'lt', 200), True)
def test_check_result_lte(self):
self.assertEqual(check_result(200, 'lte', 200), True)
def test_check_result_gt(self):
self.assertEqual(check_result(100, 'gt', 200), False)
def test_check_result_gte(self):
self.assertEqual(check_result(300, 'gte', 200), True)
def test_check_result_invalid(self):
self.assertRaises(ValueError, check_result, 100, 'xyz', 200)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
fastapi==0.65.2
pandas==1.0.3
uvicorn
requests
{
"results": [
[
{
"metric": {
"job": "titan-ccp-aggregation"
},
"values": [
[
1.634624674695E9,
"0"
],
[
1.634624679695E9,
"0"
],
[
1.634624684695E9,
"0"
],
[
1.634624689695E9,
"0"
],
[
1.634624694695E9,
"0"
],
[
1.634624699695E9,
"0"
],
[
1.634624704695E9,
"0"
],
[
1.634624709695E9,
"0"
],
[
1.634624714695E9,
"0"
],
[
1.634624719695E9,
"0"
],
[
1.634624724695E9,
"0"
],
[
1.634624729695E9,
"0"
],
[
1.634624734695E9,
"0"
],
[
1.634624739695E9,
"0"
],
[
1.634624744695E9,
"1"
],
[
1.634624749695E9,
"3"
],
[
1.634624754695E9,
"4"
],
[
1.634624759695E9,
"4"
],
[
1.634624764695E9,
"4"
],
[
1.634624769695E9,
"4"
],
[
1.634624774695E9,
"4"
],
[
1.634624779695E9,
"4"
],
[
1.634624784695E9,
"4"
],
[
1.634624789695E9,
"4"
],
[
1.634624794695E9,
"4"
],
[
1.634624799695E9,
"4"
],
[
1.634624804695E9,
"176"
],
[
1.634624809695E9,
"176"
],
[
1.634624814695E9,
"176"
],
[
1.634624819695E9,
"176"
],
[
1.634624824695E9,
"176"
],
[
1.634624829695E9,
"159524"
],
[
1.634624834695E9,
"209870"
],
[
1.634624839695E9,
"278597"
],
[
1.634624844695E9,
"460761"
],
[
1.634624849695E9,
"460761"
],
[
1.634624854695E9,
"460761"
],
[
1.634624859695E9,
"460761"
],
[
1.634624864695E9,
"460761"
],
[
1.634624869695E9,
"606893"
],
[
1.634624874695E9,
"653534"
],
[
1.634624879695E9,
"755796"
],
[
1.634624884695E9,
"919317"
],
[
1.634624889695E9,
"919317"
],
[
1.634624894695E9,
"955926"
],
[
1.634624899695E9,
"955926"
],
[
1.634624904695E9,
"955926"
],
[
1.634624909695E9,
"955926"
],
[
1.634624914695E9,
"955926"
],
[
1.634624919695E9,
"1036530"
],
[
1.634624924695E9,
"1078477"
],
[
1.634624929695E9,
"1194775"
],
[
1.634624934695E9,
"1347755"
],
[
1.634624939695E9,
"1352151"
],
[
1.634624944695E9,
"1360428"
],
[
1.634624949695E9,
"1360428"
],
[
1.634624954695E9,
"1360428"
],
[
1.634624959695E9,
"1360428"
],
[
1.634624964695E9,
"1360428"
],
[
1.634624969695E9,
"1525685"
],
[
1.634624974695E9,
"1689296"
],
[
1.634624979695E9,
"1771358"
],
[
1.634624984695E9,
"1854284"
],
[
1.634624989695E9,
"1854284"
]
]
}
]
],
"metadata": {
"warmup": 60,
"queryAggregation": "max",
"repetitionAggregation": "median",
"operator": "lt",
"threshold": 2000000
}
}
\ No newline at end of file
...@@ -13,8 +13,7 @@ import java.net.ConnectException ...@@ -13,8 +13,7 @@ import java.net.ConnectException
*/ */
class ExternalSloChecker( class ExternalSloChecker(
private val externalSlopeURL: String, private val externalSlopeURL: String,
private val threshold: Int, private val metadata: Map<String, Any>
private val warmup: Int
) : SloChecker { ) : SloChecker {
private val RETRIES = 2 private val RETRIES = 2
...@@ -37,8 +36,7 @@ class ExternalSloChecker( ...@@ -37,8 +36,7 @@ class ExternalSloChecker(
var counter = 0 var counter = 0
val data = SloJson.Builder() val data = SloJson.Builder()
.results(fetchedData.map { it.data?.result }) .results(fetchedData.map { it.data?.result })
.addMetadata("threshold", threshold) .addMetadata(metadata)
.addMetadata("warmup", warmup)
.build() .build()
.toJson() .toJson()
......
...@@ -44,11 +44,30 @@ class SloCheckerFactory { ...@@ -44,11 +44,30 @@ class SloCheckerFactory {
load: LoadDimension load: LoadDimension
): SloChecker { ): SloChecker {
return when (SloTypes.from(sloType)) { return when (SloTypes.from(sloType)) {
SloTypes.GENERIC, SloTypes.LAG_TREND, SloTypes.DROPPED_RECORDS -> ExternalSloChecker( SloTypes.GENERIC -> ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"] externalSlopeURL = properties["externalSloUrl"]
?: throw IllegalArgumentException("externalSloUrl expected"), ?: throw IllegalArgumentException("externalSloUrl expected"),
threshold = properties["threshold"]?.toInt() ?: throw IllegalArgumentException("threshold expected"), // TODO validate property contents
warmup = properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected") metadata = mapOf(
"warmup" to (properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected")),
"queryAggregation" to (properties["warmup"]?.toInt()
?: throw IllegalArgumentException("queryAggregation expected")),
"repetitionAggregation" to (properties["warmup"]?.toInt()
?: throw IllegalArgumentException("repetitionAggregation expected")),
"operator" to (properties["warmup"]?.toInt()
?: throw IllegalArgumentException("operator expected")),
"threshold" to (properties["threshold"]?.toInt()
?: throw IllegalArgumentException("threshold expected"))
)
)
SloTypes.LAG_TREND, SloTypes.DROPPED_RECORDS -> ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"]
?: throw IllegalArgumentException("externalSloUrl expected"),
metadata = mapOf(
"warmup" to (properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected")),
"threshold" to (properties["threshold"]?.toInt()
?: throw IllegalArgumentException("threshold expected"))
)
) )
SloTypes.LAG_TREND_RATIO, SloTypes.DROPPED_RECORDS_RATIO -> { SloTypes.LAG_TREND_RATIO, SloTypes.DROPPED_RECORDS_RATIO -> {
val thresholdRatio = val thresholdRatio =
...@@ -63,8 +82,11 @@ class SloCheckerFactory { ...@@ -63,8 +82,11 @@ class SloCheckerFactory {
ExternalSloChecker( ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"] externalSlopeURL = properties["externalSloUrl"]
?: throw IllegalArgumentException("externalSloUrl expected"), ?: throw IllegalArgumentException("externalSloUrl expected"),
threshold = threshold, metadata = mapOf(
warmup = properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected") "warmup" to (properties["warmup"]?.toInt()
?: throw IllegalArgumentException("warmup expected")),
"threshold" to threshold
)
) )
} }
} }
......
...@@ -26,7 +26,7 @@ class SloJson private constructor( ...@@ -26,7 +26,7 @@ class SloJson private constructor(
* @param key key of the metadata to be added * @param key key of the metadata to be added
* @param value value of the metadata to be added * @param value value of the metadata to be added
*/ */
fun addMetadata(key: String, value: String) = apply { fun addMetadata(key: String, value: Any) = apply {
if (this.metadata.isNullOrEmpty()) { if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value) this.metadata = mutableMapOf(key to value)
} else { } else {
...@@ -35,16 +35,13 @@ class SloJson private constructor( ...@@ -35,16 +35,13 @@ class SloJson private constructor(
} }
/** /**
* Add metadata as key value pairs * Add metadata as map of key value pairs.
* *
* @param key key of the metadata to be added * @param metadata map of key-value pairs to be added to be added
* @param value value of the metadata to be added
*/ */
fun addMetadata(key: String, value: Int) = apply { fun addMetadata(metadata: Map<String, Any>) = apply {
if (this.metadata.isNullOrEmpty()) { for (entry in metadata) {
this.metadata = mutableMapOf(key to value) this.addMetadata(entry.key, entry.value)
} else {
this.metadata!![key] = value
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment