Skip to content
Snippets Groups Projects
Commit 147e080d authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '249-make-analysis-flexible' into 'master'

Make the analysis of experiments more flexible

Closes #249

See merge request !180
parents 3b365db9 67e93b03
No related branches found
No related tags found
1 merge request!180Make the analysis of experiments more flexible
Pipeline #5180 passed
Showing
with 129 additions and 42 deletions
......@@ -13,7 +13,6 @@ import java.util.*
import java.util.regex.Pattern
private val logger = KotlinLogging.logger {}
private val RECORD_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
/**
* Contains the analysis. Fetches a metric from Prometheus, documents it, and evaluates it.
......@@ -51,7 +50,7 @@ class AnalysisExecutor(
fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = RECORD_LAG_QUERY
query = SloConfigHandler.getQueryString(sloType = slo.sloType)
)
}
......@@ -59,7 +58,7 @@ class AnalysisExecutor(
ioHandler.writeToCSVFile(
fileURL = "${fileURL}_${repetitionCounter++}",
data = data.getResultAsList(),
columns = listOf("group", "timestamp", "value")
columns = listOf("labels", "timestamp", "value")
)
}
......
......@@ -36,13 +36,12 @@ class ExternalSloChecker(
*/
override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0
val data = Gson().toJson(
mapOf(
"total_lags" to fetchedData.map { it.data?.result },
"threshold" to threshold,
"warmup" to warmup
)
)
val data = SloJson.Builder()
.results(fetchedData.map { it.data?.result })
.addMetadata("threshold", threshold)
.addMetadata( "warmup", warmup)
.build()
.toJson()
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
......
......@@ -43,22 +43,23 @@ class SloCheckerFactory {
properties: MutableMap<String, String>,
load: LoadDimension
): SloChecker {
return when (sloType) {
"lag trend" -> ExternalSloChecker(
return when (sloType.toLowerCase()) {
SloTypes.LAG_TREND.value, SloTypes.DROPPED_RECORDS.value -> ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"]
?: throw IllegalArgumentException("externalSloUrl expected"),
threshold = properties["threshold"]?.toInt() ?: throw IllegalArgumentException("threshold expected"),
warmup = properties["warmup"]?.toInt() ?: throw IllegalArgumentException("warmup expected")
)
"lag trend ratio" -> {
var thresholdRatio =
SloTypes.LAG_TREND_RATIO.value, SloTypes.DROPPED_RECORDS_RATIO.value -> {
val thresholdRatio =
properties["ratio"]?.toDouble()
?: throw IllegalArgumentException("ratio for threshold expected")
if (thresholdRatio < 0.0) {
throw IllegalArgumentException("Threshold ratio needs to be an Double greater or equal 0.0")
}
// cast to int, as rounding is not really necessary
var threshold = (load.get() * thresholdRatio).toInt()
val threshold = (load.get() * thresholdRatio).toInt()
ExternalSloChecker(
externalSlopeURL = properties["externalSloUrl"]
......
package theodolite.evaluation
import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped
private const val CONSUMER_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
@ApplicationScoped
class SloConfigHandler() {
companion object {
fun getQueryString(sloType: String): String {
return when (sloType.toLowerCase()) {
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $sloType")
}
}
}
}
\ No newline at end of file
package theodolite.evaluation
import com.google.gson.Gson
import theodolite.util.PromResult
class SloJson private constructor(
val results: List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
data class Builder(
var results:List<List<PromResult>?>? = null,
var metadata: MutableMap<String, Any>? = null
) {
/**
* Set the results
*
* @param results list of prometheus results
*/
fun results(results: List<List<PromResult>?>) = apply { this.results = results }
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: String) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
/**
* Add metadata as key value pairs
*
* @param key key of the metadata to be added
* @param value value of the metadata to be added
*/
fun addMetadata(key: String, value: Int) = apply {
if (this.metadata.isNullOrEmpty()) {
this.metadata = mutableMapOf(key to value)
} else {
this.metadata!![key] = value
}
}
fun build() = SloJson(
results = results,
metadata = metadata
)
}
fun toJson(): String {
return Gson().toJson(mapOf(
"results" to this.results,
"metadata" to this.metadata
))
}
}
\ No newline at end of file
package theodolite.evaluation
enum class SloTypes(val value: String) {
LAG_TREND("lag trend"),
LAG_TREND_RATIO("lag trend ratio"),
DROPPED_RECORDS("dropped records"),
DROPPED_RECORDS_RATIO("dropped records ratio")
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ abstract class BenchmarkExecutor(
val results: Results,
val executionDuration: Duration,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val slos: List<BenchmarkExecution.Slo>,
val repetitions: Int,
val executionId: Int,
val loadGenerationDelay: Long,
......
......@@ -18,7 +18,7 @@ class BenchmarkExecutorImpl(
results: Results,
executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
slos: List<BenchmarkExecution.Slo>,
repetitions: Int,
executionId: Int,
loadGenerationDelay: Long,
......@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl(
results,
executionDuration,
configurationOverrides,
slo,
slos,
repetitions,
executionId,
loadGenerationDelay,
......@@ -56,12 +56,16 @@ class BenchmarkExecutorImpl(
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) {
result = AnalysisExecutor(slo = slo, executionId = executionId)
.analyze(
load = load,
res = res,
executionIntervals = executionIntervals
)
val experimentResults = slos.map {
AnalysisExecutor(slo = it, executionId = executionId)
.analyze(
load = load,
res = res,
executionIntervals = executionIntervals
)
}
result = (false !in experimentResults)
this.results.setResult(Pair(load, res), result)
}
......
......@@ -61,7 +61,7 @@ class TheodoliteExecutor(
results = results,
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
slos = config.slos,
repetitions = config.execution.repetitions,
executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay,
......
......@@ -23,7 +23,7 @@ data class PrometheusResponse(
* The format of the returned list is: `[[ group, timestamp, value ], [ group, timestamp, value ], ... ]`
*/
fun getResultAsList(): List<List<String>> {
val group = data?.result?.get(0)?.metric?.group.toString()
val group = data?.result?.get(0)?.metric?.toString()!!
val values = data?.result?.get(0)?.values
val result = mutableListOf<List<String>>()
......@@ -64,18 +64,9 @@ data class PromResult(
/**
* Label of the metric
*/
var metric: PromMetric? = null,
var metric: Map<String, String>? = null,
/**
* Values of the metric (e.g. [ [ <unix_time>, "<sample_value>" ], ... ])
*/
var values: List<Any>? = null
)
/**
* Corresponds to the metric field in the range-vector result format of a Prometheus range-query response.
*/
@RegisterForReflection
data class PromMetric(
var group: String? = null
)
)
\ No newline at end of file
......@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 5)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 5)
val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 0)
TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results()
val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 0)
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, listOf(sloChecker), 0, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy =
......
......@@ -12,7 +12,7 @@ class TestBenchmarkExecutorImpl(
private val mockResults: Array<Array<Boolean>>,
benchmark: Benchmark,
results: Results,
slo: BenchmarkExecution.Slo,
slo: List<BenchmarkExecution.Slo>,
executionId: Int,
loadGenerationDelay: Long,
afterTeardownDelay: Long
......@@ -22,7 +22,7 @@ class TestBenchmarkExecutorImpl(
results,
executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(),
slo = slo,
slos = slo,
repetitions = 1,
executionId = executionId,
loadGenerationDelay = loadGenerationDelay,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment