Skip to content
Snippets Groups Projects
Commit 3da8dc8e authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Allow arbitrary prometheus range query for the evaluation of benchmarks

- Allow arbitrary prometheus range queres (update prometheusResponse)
- Allow usage of more than one slo checker
- Update the json which send to the python slo checker
- refactoring
- Update CRDs in order to set the query string as an dedicated parameter for a slo checker
- Update exisiting python slo checker in order to use the new JSON format
parent d7805178
Branches
Tags
1 merge request!180Make the analysis of experiments more flexible
Showing
with 133 additions and 46 deletions
apiVersion: v1
data:
jmx-kafka-prometheus.yml: |
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
kind: ConfigMap
metadata:
name: theodolite-cp-kafka-jmx-configmap
namespace: default
......@@ -60,6 +60,9 @@ spec:
prometheusUrl:
description: Connection string for Promehteus.
type: string
query:
description: The prometheus query string
type: string
offset:
description: Hours by which the start and end timestamp will be shifted (for different timezones).
type: integer
......
......
......@@ -38,7 +38,7 @@ def calculate_slope_trend(results, warmup):
err_msg = 'Computing trend slope failed.'
logger.exception(err_msg)
logger.error('Mark this subexperiment as not successful and continue benchmark.')
return False
return float('inf')
logger.info("Computed lag trend slope is '%s'", trend_slope)
return trend_slope
......@@ -49,7 +49,7 @@ def check_service_level_objective(results, threshold):
@app.post("/evaluate-slope",response_model=bool)
async def evaluate_slope(request: Request):
data = json.loads(await request.body())
results = [calculate_slope_trend(total_lag, data['warmup']) for total_lag in data['total_lags']]
return check_service_level_objective(results=results, threshold=data["threshold"])
results = [calculate_slope_trend(total_lag, data['metadata']['warmup']) for total_lag in data['results']]
return check_service_level_objective(results=results, threshold=data['metadata']["threshold"])
logger.info("SLO evaluator is online")
\ No newline at end of file
{
"total_lags": [
"results": [
[
{
"metric": {
......@@ -134,6 +134,8 @@
}
]
],
"metadata": {
"threshold": 2000,
"warmup": 0
}
}
\ No newline at end of file
{
"total_lags": [
"results": [
[
{
"metric": {
......@@ -284,6 +284,8 @@
}
]
],
"metadata": {
"threshold": 2000,
"warmup": 0
}
}
\ No newline at end of file
......@@ -51,7 +51,7 @@ spec:
description: The type of the resource. It must match one of the resource types specified in the referenced benchmark.
type: string
resourceValues:
descriptoin: List of resource values for the specified resource type.
description: List of resource values for the specified resource type.
type: array
items:
type: integer
......@@ -68,6 +68,9 @@ spec:
prometheusUrl:
description: Connection string for Promehteus.
type: string
query:
description: The prometheus query string
type: string
offset:
description: Hours by which the start and end timestamp will be shifted (for different timezones).
type: integer
......
......
......@@ -14,6 +14,7 @@ spec:
- sloType: "lag trend"
prometheusUrl: "http://prometheus-operated:9090"
offset: 0
query: "sum by(group)(kafka_consumergroup_group_lag >= 0)"
properties:
threshold: 2000
externalSloUrl: "http://localhost:80/evaluate-slope"
......
......
......@@ -10,6 +10,7 @@ slos:
- sloType: "lag trend"
prometheusUrl: "http://prometheus-operated:9090"
offset: 0
query: "sum by(group)(kafka_consumergroup_group_lag >= 0)"
properties:
threshold: 2000
externalSloUrl: "http://localhost:80/evaluate-slope"
......
......
......@@ -63,6 +63,7 @@ class BenchmarkExecution : KubernetesResource {
class Slo : KubernetesResource {
lateinit var sloType: String
lateinit var prometheusUrl: String
lateinit var query: String
var offset by Delegates.notNull<Int>()
lateinit var properties: MutableMap<String, String>
}
......
......
......@@ -37,6 +37,7 @@ class AnalysisExecutor(
* @return true if the experiment succeeded.
*/
fun analyze(load: LoadDimension, res: Resource, executionIntervals: List<Pair<Instant, Instant>>): Boolean {
var result = false
var repetitionCounter = 1
......@@ -50,7 +51,7 @@ class AnalysisExecutor(
fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = RECORD_LAG_QUERY
query = slo.query
)
}
......@@ -58,7 +59,7 @@ class AnalysisExecutor(
ioHandler.writeToCSVFile(
fileURL = "${fileURL}_${repetitionCounter++}",
data = data.getResultAsList(),
columns = listOf("group", "timestamp", "value")
columns = listOf("labels", "timestamp", "value")
)
}
......
......
......@@ -36,13 +36,12 @@ class ExternalSloChecker(
*/
override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0
val data = Gson().toJson(
mapOf(
"total_lags" to fetchedData.map { it.data?.result },
"threshold" to threshold,
"warmup" to warmup
)
)
val data = SloJson.Builder()
.results(fetchedData.map { it.data?.result })
.addMetadata("threshold", threshold)
.addMetadata( "warmup", warmup)
.build()
.toJson()
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
......
......
package theodolite.evaluation
import com.google.gson.Gson
import theodolite.util.PromResult
class SloJson private constructor(
val results: List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
data class Builder(
var results:List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
/**
* Set the results
*
* @param results list of prometheus results
*/
fun results(results: List<List<PromResult>?>) = apply { this.results = results }
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: String) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: Int) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
fun build() = SloJson(
results = results,
metadata = metadata
)
}
fun toJson(): String {
return Gson().toJson(mapOf(
"results" to this.results,
"metadata" to this.metadata
))
}
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ abstract class BenchmarkExecutor(
val results: Results,
val executionDuration: Duration,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val slos: List<BenchmarkExecution.Slo>,
val repetitions: Int,
val executionId: Int,
val loadGenerationDelay: Long,
......
......
......@@ -20,7 +20,7 @@ class BenchmarkExecutorImpl(
results: Results,
executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
slos: List<BenchmarkExecution.Slo>,
repetitions: Int,
executionId: Int,
loadGenerationDelay: Long,
......@@ -30,7 +30,7 @@ class BenchmarkExecutorImpl(
results,
executionDuration,
configurationOverrides,
slo,
slos,
repetitions,
executionId,
loadGenerationDelay,
......@@ -53,13 +53,19 @@ class BenchmarkExecutorImpl(
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) {
result = AnalysisExecutor(slo = slo, executionId = executionId)
val experimentResults = slos.map {
AnalysisExecutor(slo = it, executionId = executionId)
.analyze(
load = load,
res = res,
executionIntervals = executionIntervals
)
}
result = (false !in experimentResults)
this.results.setResult(Pair(load, res), result)
}
return result
}
......
......
......@@ -64,7 +64,7 @@ class TheodoliteExecutor(
results = results,
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
slos = config.slos,
repetitions = config.execution.repetitions,
executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay,
......
......
......@@ -23,7 +23,7 @@ data class PrometheusResponse(
* The format of the returned list is: `[[ group, timestamp, value ], [ group, timestamp, value ], ... ]`
*/
fun getResultAsList(): List<List<String>> {
val group = data?.result?.get(0)?.metric?.group.toString()
val group = data?.result?.get(0)?.metric?.toString()!!
val values = data?.result?.get(0)?.values
val result = mutableListOf<List<String>>()
......@@ -64,18 +64,9 @@ data class PromResult(
/**
* Label of the metric
*/
var metric: PromMetric? = null,
var metric: Map<String,String>? = null,
/**
* Values of the metric (e.g. [ [ <unix_time>, "<sample_value>" ], ... ])
*/
var values: List<Any>? = null
)
\ No newline at end of file
/**
* Corresponds to the metric field in the range-vector result format of a Prometheus range-query response.
*/
@RegisterForReflection
data class PromMetric(
var group: String? = null
)
......@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 5)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 5)
val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 0)
TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 0)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -114,4 +114,5 @@ class CompositeStrategyTest {
assertEquals(actual, expected)
}
}
......@@ -12,7 +12,7 @@ class TestBenchmarkExecutorImpl(
private val mockResults: Array<Array<Boolean>>,
benchmark: Benchmark,
results: Results,
slo: BenchmarkExecution.Slo,
slo: List<BenchmarkExecution.Slo>,
executionId: Int,
loadGenerationDelay: Long,
afterTeardownDelay: Long
......@@ -22,7 +22,7 @@ class TestBenchmarkExecutorImpl(
results,
executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(),
slo = slo,
slos = slo,
repetitions = 1,
executionId = executionId,
loadGenerationDelay = loadGenerationDelay,
......
......
......@@ -15,6 +15,7 @@ spec:
- sloType: "lag trend"
prometheusUrl: "http://prometheus-operated:9090"
offset: 0
query: "sum by(group)(kafka_consumergroup_group_lag >= 0)"
properties:
threshold: 2000
externalSloUrl: "http://localhost:80/evaluate-slope"
......
......
......@@ -15,6 +15,7 @@ spec:
- sloType: "lag trend"
prometheusUrl: "http://prometheus-operated:9090"
offset: 0
query: "sum by(group)(kafka_consumergroup_group_lag >= 0)"
properties:
threshold: 2000
externalSloUrl: "http://localhost:80/evaluate-slope"
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment