Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (25)
Showing
with 610 additions and 24 deletions
......@@ -307,11 +307,23 @@ deploy-theodolite:
# Theodolite SLO Checker: Lag Trend
test-slo-checker-lag-trend:
stage: test
image: python:3.7-slim
tags:
- exec-docker
script:
- cd slope-evaluator
- pip install -r requirements.txt
- cd app
- python -m unittest
deploy-slo-checker-lag-trend:
stage: deploy
extends:
- .dind
needs: []
needs:
- test-slo-checker-lag-trend
script:
- DOCKER_TAG_NAME=$(echo $CI_COMMIT_REF_SLUG- | sed 's/^master-$//')
- docker build --pull -t theodolite-slo-checker-lag-trend slope-evaluator
......
charts
\ No newline at end of file
......@@ -253,7 +253,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "sum by(group, topic) (kafka_consumergroup_group_lag > 0)",
"expr": "sum by(group, topic) (kafka_consumergroup_group_lag >= 0)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{topic}}",
......@@ -436,7 +436,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "sum by(group,topic) (kafka_consumergroup_group_offset > 0)",
"expr": "sum by(group,topic) (kafka_consumergroup_group_offset >= 0)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{topic}}",
......@@ -527,7 +527,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "count by(group,topic) (kafka_consumergroup_group_offset > 0)",
"expr": "count by(group,topic) (kafka_consumergroup_group_offset >= 0)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{topic}}",
......@@ -892,7 +892,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "sum by(group) (kafka_consumergroup_group_lag > 0)",
"expr": "sum by(group) (kafka_consumergroup_group_lag >= 0)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "total lag",
......
{{- if .Values.randomScheduler.rbac.create -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "theodolite.fullname" . }}-random-scheduler
subjects:
- kind: ServiceAccount
name: {{ include "theodolite.fullname" . }}-random-scheduler
namespace: kube-system
roleRef:
kind: ClusterRole
apiGroup: rbac.authorization.k8s.io
name: system:kube-scheduler
{{- end }}
{{- if .Values.randomScheduler.enabled -}}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "theodolite.fullname" . }}-random-scheduler
labels:
app: {{ include "theodolite.fullname" . }}
component: random-scheduler
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: {{ include "theodolite.fullname" . }}
component: random-scheduler
template:
metadata:
labels:
app: {{ include "theodolite.fullname" . }}
component: random-scheduler
spec:
serviceAccount: {{ include "theodolite.fullname" . }}-random-scheduler
containers:
- name: random-scheduler
image: ghcr.io/cau-se/theodolite-random-scheduler:theodolite-kotlin-latest
#imagePullPolicy: Always
env:
- name: TARGET_NAMESPACE
value: {{ .Release.Namespace }}
{{- end }}
{{- if .Values.randomScheduler.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
namespace: kube-system
name: {{ include "theodolite.fullname" . }}-random-scheduler
labels:
app: {{ include "theodolite.fullname" . }}
component: random-scheduler
{{- end }}
......@@ -25,8 +25,16 @@ grafana:
adminUser: admin
adminPassword: admin
grafana.ini:
#org_name: Theodolite
auth.anonymous:
# enable anonymous access
enabled: true
org_role: Admin # Role for unauthenticated users, other valid values are `Viewer`, `Editor` and `Admin`
users:
default_theme: light
#dashboards: # the following doesn't work but is planed
# Path to the default home dashboard. If this value is empty, then Grafana uses StaticRootPath + "dashboards/home.json"
#default_home_dashboard_path: "/tmp/dashboards/k8s-dashboard.json"
## Sidecars that collect the configmaps with specified label and stores the included files them into the respective folders
## Requires at least Grafana 5 to work and can't be used together with parameters dashboardProviders, datasources and dashboards
sidecar:
......@@ -92,8 +100,9 @@ cp-helm-charts:
"replica.fetch.max.bytes": "134217728" # 128 MB
#default.replication.factor: 1
# "min.insync.replicas": 2
# "auto.create.topics.enable": false
"log.retention.ms": "10000" # 10s
"auto.create.topics.enable": false
#"log.retention.ms": "10000" # 10s
"log.retention.ms": "7200000" # 2h
"metrics.sample.window.ms": "5000" #5s
"advertised.listeners": |-
EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID}))
......@@ -248,3 +257,10 @@ serviceAccount:
rbac:
create: true
randomScheduler:
enabled: true
rbac:
create: true
serviceAccount:
create: true
......@@ -5,7 +5,7 @@
For development:
```sh
uvicorn main:app --reload
uvicorn main:app --reload # run this command inside the app/ folder
```
## Build the docker image:
......@@ -32,7 +32,7 @@ The running webserver provides a REST API with the following route:
* /evaluate-slope
* Method: POST
* Body:
* total_lag
* total_lags
* threshold
* warmup
......@@ -40,14 +40,16 @@ The body of the request must be a JSON string that satisfies the following condi
* **total_lag**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON structure:
```
{
"metric": {
"group": "<label_value>"
},
"values": [
[
<unix_timestamp>,
"<sample_value>"
{
[
"metric": {
"group": "<label_value>"
},
"values": [
[
<unix_timestamp>,
"<sample_value>"
]
]
]
}
......
......@@ -5,6 +5,7 @@ import os
import pandas as pd
import json
import sys
from statistics import median
app = FastAPI()
......@@ -20,7 +21,7 @@ elif os.getenv('LOG_LEVEL') == 'WARNING':
elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel(logging.DEBUG)
def execute(results, threshold, warmup):
def calculate_slope_trend(results, warmup):
d = []
for result in results:
group = result['metric']['group']
......@@ -39,13 +40,16 @@ def execute(results, threshold, warmup):
logger.error('Mark this subexperiment as not successful and continue benchmark.')
return False
result = trend_slope < threshold
logger.info("Computed lag trend slope is '%s'. Result is: %s", trend_slope, result)
return result
logger.info("Computed lag trend slope is '%s'", trend_slope)
return trend_slope
def check_service_level_objective(results, threshold):
return median(results) < threshold
@app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request):
data = json.loads(await request.body())
return execute(data['total_lag'], data['threshold'], data['warmup'])
results = [calculate_slope_trend(total_lag, data['warmup']) for total_lag in data['total_lags']]
return check_service_level_objective(results=results, threshold=data["threshold"])
logger.info("Slope evaluator is online")
\ No newline at end of file
logger.info("SLO evaluator is online")
\ No newline at end of file
import unittest
from main import app, check_service_level_objective
import json
from fastapi.testclient import TestClient
class TestSloEvaluation(unittest.TestCase):
client = TestClient(app)
def test_1_rep(self):
with open('../resources/test-1-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/evaluate-slope", json=data)
self.assertEquals(response.json(), True)
def test_3_rep(self):
with open('../resources/test-3-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/evaluate-slope", json=data)
self.assertEquals(response.json(), True)
def test_check_service_level_objective(self):
list = [1,2,3,4]
self.assertEquals(check_service_level_objective(list, 2), False)
self.assertEquals(check_service_level_objective(list, 3), True)
list = [1,2,3,4,5]
self.assertEquals(check_service_level_objective(list, 2), False)
self.assertEquals(check_service_level_objective(list, 4), True)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
fastapi==0.55.1
scikit-learn==0.20.3
pandas==1.0.3
uvicorn
requests
{
"total_lags": [
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621008960827E9,
"234"
],
[
1.621008965827E9,
"234"
],
[
1.621008970827E9,
"234"
],
[
1.621008975827E9,
"719"
],
[
1.621008980827E9,
"719"
],
[
1.621008985827E9,
"719"
],
[
1.621008990827E9,
"1026"
],
[
1.621008995827E9,
"1026"
],
[
1.621009000827E9,
"1026"
],
[
1.621009005827E9,
"534"
],
[
1.621009010827E9,
"534"
],
[
1.621009015827E9,
"534"
],
[
1.621009020827E9,
"943"
],
[
1.621009025827E9,
"943"
],
[
1.621009030827E9,
"943"
],
[
1.621009035827E9,
"66"
],
[
1.621009040827E9,
"66"
],
[
1.621009045827E9,
"66"
],
[
1.621009050827E9,
"841"
],
[
1.621009055827E9,
"841"
],
[
1.621009060827E9,
"841"
],
[
1.621009065827E9,
"405"
],
[
1.621009070827E9,
"405"
],
[
1.621009075827E9,
"405"
],
[
1.621009080827E9,
"201"
],
[
1.621009085827E9,
"201"
],
[
1.621009090827E9,
"201"
],
[
1.621009095827E9,
"227"
],
[
1.621009100827E9,
"227"
],
[
1.621009105827E9,
"227"
],
[
1.621009110827E9,
"943"
]
]
}
]
],
"threshold": 2000,
"warmup": 0
}
\ No newline at end of file
{
"total_lags": [
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012384232E9,
"6073"
],
[
1.621012389232E9,
"6073"
],
[
1.621012394232E9,
"6073"
],
[
1.621012399232E9,
"227"
],
[
1.621012404232E9,
"227"
],
[
1.621012409232E9,
"227"
],
[
1.621012414232E9,
"987"
],
[
1.621012419232E9,
"987"
],
[
1.621012424232E9,
"987"
],
[
1.621012429232E9,
"100"
],
[
1.621012434232E9,
"100"
],
[
1.621012439232E9,
"100"
],
[
1.621012444232E9,
"959"
],
[
1.621012449232E9,
"959"
],
[
1.621012454232E9,
"959"
],
[
1.621012459232E9,
"625"
],
[
1.621012464232E9,
"625"
],
[
1.621012469232E9,
"625"
],
[
1.621012474232E9,
"683"
],
[
1.621012479232E9,
"683"
],
[
1.621012484232E9,
"683"
],
[
1.621012489232E9,
"156"
]
]
}
],
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012545211E9,
"446"
],
[
1.621012550211E9,
"446"
],
[
1.621012555211E9,
"446"
],
[
1.621012560211E9,
"801"
],
[
1.621012565211E9,
"801"
],
[
1.621012570211E9,
"801"
],
[
1.621012575211E9,
"773"
],
[
1.621012580211E9,
"773"
],
[
1.621012585211E9,
"773"
],
[
1.621012590211E9,
"509"
],
[
1.621012595211E9,
"509"
],
[
1.621012600211E9,
"509"
],
[
1.621012605211E9,
"736"
],
[
1.621012610211E9,
"736"
],
[
1.621012615211E9,
"736"
],
[
1.621012620211E9,
"903"
],
[
1.621012625211E9,
"903"
],
[
1.621012630211E9,
"903"
],
[
1.621012635211E9,
"512"
],
[
1.621012640211E9,
"512"
],
[
1.621012645211E9,
"512"
]
]
}
],
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012700748E9,
"6484"
],
[
1.621012705748E9,
"6484"
],
[
1.621012710748E9,
"6484"
],
[
1.621012715748E9,
"505"
],
[
1.621012720748E9,
"505"
],
[
1.621012725748E9,
"505"
],
[
1.621012730748E9,
"103"
],
[
1.621012735748E9,
"103"
],
[
1.621012740748E9,
"103"
],
[
1.621012745748E9,
"201"
],
[
1.621012750748E9,
"201"
],
[
1.621012755748E9,
"201"
],
[
1.621012760748E9,
"965"
],
[
1.621012765748E9,
"965"
],
[
1.621012770748E9,
"965"
],
[
1.621012775748E9,
"876"
],
[
1.621012780748E9,
"876"
],
[
1.621012785748E9,
"876"
],
[
1.621012790748E9,
"380"
],
[
1.621012795748E9,
"380"
],
[
1.621012800748E9,
"380"
]
]
}
]
],
"threshold": 2000,
"warmup": 0
}
\ No newline at end of file
......@@ -19,6 +19,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
}
package theodolite.uc1.application;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
......@@ -42,6 +43,14 @@ public final class HistoryServiceFlinkJob {
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
}
private void buildPipeline() {
......@@ -61,7 +70,8 @@ public final class HistoryServiceFlinkJob {
stream
.rebalance()
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record));
.flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
}
/**
......
......@@ -30,6 +30,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
}
......@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
......
......@@ -34,6 +34,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
}
......@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
......
......@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs);
}
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
......