Skip to content
Snippets Groups Projects
Commit e3cb3ba7 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '100-allow-repititions' into 'theodolite-kotlin'

Allow multiple repititions of an experiment

See merge request !137
parents b5803592 d42f584a
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!137Allow multiple repititions of an experiment,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3331 passed
Showing
with 536 additions and 59 deletions
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
For development: For development:
```sh ```sh
uvicorn main:app --reload uvicorn main:app --reload # run this command inside the app/ folder
``` ```
## Build the docker image: ## Build the docker image:
...@@ -32,7 +32,7 @@ The running webserver provides a REST API with the following route: ...@@ -32,7 +32,7 @@ The running webserver provides a REST API with the following route:
* /evaluate-slope * /evaluate-slope
* Method: POST * Method: POST
* Body: * Body:
* total_lag * total_lags
* threshold * threshold
* warmup * warmup
...@@ -41,6 +41,7 @@ The body of the request must be a JSON string that satisfies the following condi ...@@ -41,6 +41,7 @@ The body of the request must be a JSON string that satisfies the following condi
* **total_lag**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON structure: * **total_lag**: This property is based on the [Range Vector type](https://www.prometheus.io/docs/prometheus/latest/querying/api/#range-vectors) from Prometheus and must have the following JSON structure:
``` ```
{ {
[
"metric": { "metric": {
"group": "<label_value>" "group": "<label_value>"
}, },
...@@ -50,6 +51,7 @@ The body of the request must be a JSON string that satisfies the following condi ...@@ -50,6 +51,7 @@ The body of the request must be a JSON string that satisfies the following condi
"<sample_value>" "<sample_value>"
] ]
] ]
]
} }
``` ```
* The `<label_value>` provided in "metric.group" must be equal to the id of the Kafka consumer group. * The `<label_value>` provided in "metric.group" must be equal to the id of the Kafka consumer group.
......
...@@ -5,6 +5,7 @@ import os ...@@ -5,6 +5,7 @@ import os
import pandas as pd import pandas as pd
import json import json
import sys import sys
from statistics import median
app = FastAPI() app = FastAPI()
...@@ -20,7 +21,7 @@ elif os.getenv('LOG_LEVEL') == 'WARNING': ...@@ -20,7 +21,7 @@ elif os.getenv('LOG_LEVEL') == 'WARNING':
elif os.getenv('LOG_LEVEL') == 'DEBUG': elif os.getenv('LOG_LEVEL') == 'DEBUG':
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
def execute(results, threshold, warmup): def calculate_slope_trend(results, warmup):
d = [] d = []
for result in results: for result in results:
group = result['metric']['group'] group = result['metric']['group']
...@@ -39,13 +40,16 @@ def execute(results, threshold, warmup): ...@@ -39,13 +40,16 @@ def execute(results, threshold, warmup):
logger.error('Mark this subexperiment as not successful and continue benchmark.') logger.error('Mark this subexperiment as not successful and continue benchmark.')
return False return False
result = trend_slope < threshold logger.info("Computed lag trend slope is '%s'", trend_slope)
logger.info("Computed lag trend slope is '%s'. Result is: %s", trend_slope, result) return trend_slope
return result
def check_service_level_objective(results, threshold):
return median(results) < threshold
@app.post("/evaluate-slope",response_model=bool) @app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request): async def evaluate_slope(request: Request):
data = json.loads(await request.body()) data = json.loads(await request.body())
return execute(data['total_lag'], data['threshold'], data['warmup']) results = [calculate_slope_trend(total_lag, data['warmup']) for total_lag in data['total_lags']]
return check_service_level_objective(results=results, threshold=data["threshold"])
logger.info("Slope evaluator is online") logger.info("SLO evaluator is online")
\ No newline at end of file \ No newline at end of file
import unittest
from main import app, check_service_level_objective
import json
from fastapi.testclient import TestClient
class TestSloEvaluation(unittest.TestCase):
client = TestClient(app)
def test_1_rep(self):
with open('../resources/test-1-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/evaluate-slope", json=data)
self.assertEquals(response.json(), True)
def test_3_rep(self):
with open('../resources/test-3-rep-success.json') as json_file:
data = json.load(json_file)
response = self.client.post("/evaluate-slope", json=data)
self.assertEquals(response.json(), True)
def test_check_service_level_objective(self):
list = [1,2,3,4]
self.assertEquals(check_service_level_objective(list, 2), False)
self.assertEquals(check_service_level_objective(list, 3), True)
list = [1,2,3,4,5]
self.assertEquals(check_service_level_objective(list, 2), False)
self.assertEquals(check_service_level_objective(list, 4), True)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
fastapi==0.55.1 fastapi==0.55.1
scikit-learn==0.20.3 scikit-learn==0.20.3
pandas==1.0.3 pandas==1.0.3
uvicorn
{
"total_lags": [
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621008960827E9,
"234"
],
[
1.621008965827E9,
"234"
],
[
1.621008970827E9,
"234"
],
[
1.621008975827E9,
"719"
],
[
1.621008980827E9,
"719"
],
[
1.621008985827E9,
"719"
],
[
1.621008990827E9,
"1026"
],
[
1.621008995827E9,
"1026"
],
[
1.621009000827E9,
"1026"
],
[
1.621009005827E9,
"534"
],
[
1.621009010827E9,
"534"
],
[
1.621009015827E9,
"534"
],
[
1.621009020827E9,
"943"
],
[
1.621009025827E9,
"943"
],
[
1.621009030827E9,
"943"
],
[
1.621009035827E9,
"66"
],
[
1.621009040827E9,
"66"
],
[
1.621009045827E9,
"66"
],
[
1.621009050827E9,
"841"
],
[
1.621009055827E9,
"841"
],
[
1.621009060827E9,
"841"
],
[
1.621009065827E9,
"405"
],
[
1.621009070827E9,
"405"
],
[
1.621009075827E9,
"405"
],
[
1.621009080827E9,
"201"
],
[
1.621009085827E9,
"201"
],
[
1.621009090827E9,
"201"
],
[
1.621009095827E9,
"227"
],
[
1.621009100827E9,
"227"
],
[
1.621009105827E9,
"227"
],
[
1.621009110827E9,
"943"
]
]
}
]
],
"threshold": 2000,
"warmup": 0
}
\ No newline at end of file
{
"total_lags": [
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012384232E9,
"6073"
],
[
1.621012389232E9,
"6073"
],
[
1.621012394232E9,
"6073"
],
[
1.621012399232E9,
"227"
],
[
1.621012404232E9,
"227"
],
[
1.621012409232E9,
"227"
],
[
1.621012414232E9,
"987"
],
[
1.621012419232E9,
"987"
],
[
1.621012424232E9,
"987"
],
[
1.621012429232E9,
"100"
],
[
1.621012434232E9,
"100"
],
[
1.621012439232E9,
"100"
],
[
1.621012444232E9,
"959"
],
[
1.621012449232E9,
"959"
],
[
1.621012454232E9,
"959"
],
[
1.621012459232E9,
"625"
],
[
1.621012464232E9,
"625"
],
[
1.621012469232E9,
"625"
],
[
1.621012474232E9,
"683"
],
[
1.621012479232E9,
"683"
],
[
1.621012484232E9,
"683"
],
[
1.621012489232E9,
"156"
]
]
}
],
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012545211E9,
"446"
],
[
1.621012550211E9,
"446"
],
[
1.621012555211E9,
"446"
],
[
1.621012560211E9,
"801"
],
[
1.621012565211E9,
"801"
],
[
1.621012570211E9,
"801"
],
[
1.621012575211E9,
"773"
],
[
1.621012580211E9,
"773"
],
[
1.621012585211E9,
"773"
],
[
1.621012590211E9,
"509"
],
[
1.621012595211E9,
"509"
],
[
1.621012600211E9,
"509"
],
[
1.621012605211E9,
"736"
],
[
1.621012610211E9,
"736"
],
[
1.621012615211E9,
"736"
],
[
1.621012620211E9,
"903"
],
[
1.621012625211E9,
"903"
],
[
1.621012630211E9,
"903"
],
[
1.621012635211E9,
"512"
],
[
1.621012640211E9,
"512"
],
[
1.621012645211E9,
"512"
]
]
}
],
[
{
"metric": {
"group": "theodolite-uc1-application-0.0.1"
},
"values": [
[
1.621012700748E9,
"6484"
],
[
1.621012705748E9,
"6484"
],
[
1.621012710748E9,
"6484"
],
[
1.621012715748E9,
"505"
],
[
1.621012720748E9,
"505"
],
[
1.621012725748E9,
"505"
],
[
1.621012730748E9,
"103"
],
[
1.621012735748E9,
"103"
],
[
1.621012740748E9,
"103"
],
[
1.621012745748E9,
"201"
],
[
1.621012750748E9,
"201"
],
[
1.621012755748E9,
"201"
],
[
1.621012760748E9,
"965"
],
[
1.621012765748E9,
"965"
],
[
1.621012770748E9,
"965"
],
[
1.621012775748E9,
"876"
],
[
1.621012780748E9,
"876"
],
[
1.621012785748E9,
"876"
],
[
1.621012790748E9,
"380"
],
[
1.621012795748E9,
"380"
],
[
1.621012800748E9,
"380"
]
]
}
]
],
"threshold": 2000,
"warmup": 0
}
\ No newline at end of file
...@@ -34,25 +34,27 @@ class AnalysisExecutor( ...@@ -34,25 +34,27 @@ class AnalysisExecutor(
* @param executionDuration of the experiment. * @param executionDuration of the experiment.
* @return true if the experiment succeeded. * @return true if the experiment succeeded.
*/ */
fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean { fun analyze(load: LoadDimension, res: Resource, executionIntervals: List<Pair<Instant, Instant>>): Boolean {
var result = false var result = false
val exporter = CsvExporter()
var repetitionCounter = 1
try { try {
val prometheusData = fetcher.fetchMetric( var resultsFolder: String = System.getenv("RESULTS_FOLDER") ?: ""
start = Instant.now().minus(executionDuration),
end = Instant.now(),
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
var resultsFolder: String = System.getenv("RESULTS_FOLDER")
if (resultsFolder.isNotEmpty()){ if (resultsFolder.isNotEmpty()){
resultsFolder += "/" resultsFolder += "/"
} }
val prometheusData = executionIntervals
.map { interval -> fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)") }
val fileName= "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}"
prometheusData.forEach{ data ->
exporter.toCsv(name = "${fileName}_${repetitionCounter++}", prom = data) }
CsvExporter().toCsv(
name = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}",
prom = prometheusData
)
val sloChecker = SloCheckerFactory().create( val sloChecker = SloCheckerFactory().create(
sloType = slo.sloType, sloType = slo.sloType,
externalSlopeURL = slo.externalSloUrl, externalSlopeURL = slo.externalSloUrl,
...@@ -60,10 +62,7 @@ class AnalysisExecutor( ...@@ -60,10 +62,7 @@ class AnalysisExecutor(
warmup = slo.warmup warmup = slo.warmup
) )
result = sloChecker.evaluate( result = sloChecker.evaluate(prometheusData)
start = Instant.now().minus(executionDuration),
end = Instant.now(), fetchedData = prometheusData
)
} catch (e: Exception) { } catch (e: Exception) {
logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" } logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" }
......
...@@ -35,10 +35,10 @@ class ExternalSloChecker( ...@@ -35,10 +35,10 @@ class ExternalSloChecker(
* @return true if the experiment was successful(the threshold was not exceeded. * @return true if the experiment was successful(the threshold was not exceeded.
* @throws ConnectException if the external service could not be reached. * @throws ConnectException if the external service could not be reached.
*/ */
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean { override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0 var counter = 0
val data = Gson().toJson(mapOf( val data = Gson().toJson(mapOf(
"total_lag" to fetchedData.data?.result, "total_lags" to fetchedData.map { it.data?.result},
"threshold" to threshold, "threshold" to threshold,
"warmup" to warmup)) "warmup" to warmup))
......
package theodolite.evaluation package theodolite.evaluation
import theodolite.util.PrometheusResponse import theodolite.util.PrometheusResponse
import java.time.Instant
/** /**
* A SloChecker can be used to evaluate data from Prometheus. * A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker * @constructor Creates an empty SloChecker
*/ */
interface SloChecker { interface SloChecker {
/** /**
* Evaluates [fetchedData] and returns if the experiment was successful. * Evaluates [fetchedData] and returns if the experiment was successful.
* Returns if the evaluated experiment was successful. * Returns if the evaluated experiment was successful.
...@@ -18,5 +16,5 @@ interface SloChecker { ...@@ -18,5 +16,5 @@ interface SloChecker {
* @param fetchedData from Prometheus that will be evaluated. * @param fetchedData from Prometheus that will be evaluated.
* @return true if experiment was successful. Otherwise false. * @return true if experiment was successful. Otherwise false.
*/ */
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean fun evaluate(fetchedData: List<PrometheusResponse>): Boolean
} }
...@@ -26,6 +26,7 @@ abstract class BenchmarkExecutor( ...@@ -26,6 +26,7 @@ abstract class BenchmarkExecutor(
val executionDuration: Duration, val executionDuration: Duration,
val configurationOverrides: List<ConfigurationOverride?>, val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo, val slo: BenchmarkExecution.Slo,
val repetitions: Int,
val executionId: Int, val executionId: Int,
val loadGenerationDelay: Long, val loadGenerationDelay: Long,
val afterTeardownDelay: Long val afterTeardownDelay: Long
......
...@@ -5,11 +5,9 @@ import mu.KotlinLogging ...@@ -5,11 +5,9 @@ import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride import theodolite.util.*
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -20,42 +18,56 @@ class BenchmarkExecutorImpl( ...@@ -20,42 +18,56 @@ class BenchmarkExecutorImpl(
executionDuration: Duration, executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
repetitions: Int,
executionId: Int, executionId: Int,
loadGenerationDelay: Long, loadGenerationDelay: Long,
afterTeardownDelay: Long afterTeardownDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay, afterTeardownDelay) { ) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, repetitions, executionId, loadGenerationDelay, afterTeardownDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay, this.afterTeardownDelay) val executionIntervals: MutableList<Pair<Instant, Instant>> = ArrayList()
try { for (i in 1.rangeTo(repetitions)) {
benchmarkDeployment.setup() logger.info { "Run repetition $i/$repetitions" }
this.waitAndLog() if (this.run.get()) {
} catch (e: Exception) { executionIntervals.add(runSingleExperiment(load,res))
logger.error { "Error while setting up experiment with id ${this.executionId}." } } else {
logger.error { "Error is: $e" } break
this.run.set(false) }
} }
/** /**
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user. * Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/ */
if (this.run.get()) { if (this.run.get()) {
result = AnalysisExecutor(slo = slo, executionId = executionId).analyze( result =AnalysisExecutor(slo = slo, executionId = executionId)
.analyze(
load = load, load = load,
res = res, res = res,
executionDuration = executionDuration executionIntervals = executionIntervals)
)
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
} }
return result
}
private fun runSingleExperiment(load: LoadDimension, res: Resource): Pair<Instant, Instant> {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides, this.loadGenerationDelay, this.afterTeardownDelay)
val from = Instant.now()
try {
benchmarkDeployment.setup()
this.waitAndLog()
} catch (e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" }
this.run.set(false)
}
val to = Instant.now()
try { try {
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
} catch (e: Exception) { } catch (e: Exception) {
logger.warn { "Error while tearing down the benchmark deployment." } logger.warn { "Error while tearing down the benchmark deployment." }
logger.debug { "Teardown failed, caused by: $e" } logger.debug { "Teardown failed, caused by: $e" }
} }
return Pair(from,to)
return result
} }
} }
...@@ -71,6 +71,7 @@ class TheodoliteExecutor( ...@@ -71,6 +71,7 @@ class TheodoliteExecutor(
executionDuration = executionDuration, executionDuration = executionDuration,
configurationOverrides = config.configOverrides, configurationOverrides = config.configOverrides,
slo = config.slos[0], slo = config.slos[0],
repetitions = config.execution.repetitions,
executionId = config.executionId, executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay, loadGenerationDelay = config.execution.loadGenerationDelay,
afterTeardownDelay = config.execution.afterTeardownDelay afterTeardownDelay = config.execution.afterTeardownDelay
......
...@@ -31,8 +31,8 @@ class TheodoliteYamlExecutor { ...@@ -31,8 +31,8 @@ class TheodoliteYamlExecutor {
fun start() { fun start() {
logger.info { "Theodolite started" } logger.info { "Theodolite started" }
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml" val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/example-execution-yaml-resource.yaml"
val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "./config/BenchmarkType.yaml" val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "./config/example-benchmark-yaml-resource.yaml"
logger.info { "Using $executionPath for BenchmarkExecution" } logger.info { "Using $executionPath for BenchmarkExecution" }
logger.info { "Using $benchmarkPath for BenchmarkType" } logger.info { "Using $benchmarkPath for BenchmarkType" }
......
...@@ -23,6 +23,7 @@ class TestBenchmarkExecutorImpl( ...@@ -23,6 +23,7 @@ class TestBenchmarkExecutorImpl(
executionDuration = Duration.ofSeconds(1), executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(), configurationOverrides = emptyList(),
slo = slo, slo = slo,
repetitions = 1,
executionId = executionId, executionId = executionId,
loadGenerationDelay = loadGenerationDelay, loadGenerationDelay = loadGenerationDelay,
afterTeardownDelay = afterTeardownDelay afterTeardownDelay = afterTeardownDelay
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment