Skip to content
Snippets Groups Projects
Commit b73c1936 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge remote-tracking branch 'upstream/theodolite-kotlin' into feature/158-handle-shutdown

parents ab0849fa 22913c8a
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!96Handle shutdown,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.LoadDimension
import theodolite.util.Resource
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
private val fetcher = MetricFetcher(
prometheusURL = slo.prometheusUrl,
offset = Duration.ofHours(slo.offset.toLong())
)
fun analyse(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false
try {
val prometheusData = fetcher.fetchMetric(
start = Instant.now().minus(executionDuration),
end = Instant.now(),
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
CsvExporter().toCsv(name = "${load.get()}_${res.get()}_${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
warmup = slo.warmup
)
result = sloChecker.evaluate(
start = Instant.now().minus(executionDuration),
end = Instant.now(), fetchedData = prometheusData
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
return result
}
}
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.io.File
import java.io.PrintWriter
import java.util.*
private val logger = KotlinLogging.logger {}
class CsvExporter {
/**
* Uses the PrintWriter to transform a PrometheusResponse to Csv
*/
fun toCsv(name: String, prom: PrometheusResponse) {
val responseArray = promResponseToList(prom)
val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("name", "time", "value").joinToString())
responseArray.forEach {
pw.println(it.joinToString())
}
}
logger.info { "Wrote csv file: $name to ${csvOutputFile.absolutePath}" }
}
/**
* Converts a PrometheusResponse into a List of List of Strings
*/
private fun promResponseToList(prom: PrometheusResponse): List<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString()
val values = prom.data?.result?.get(0)?.values
val dataList = mutableListOf<List<String>>()
if (values != null) {
for (x in values) {
val y = x as List<*>
dataList.add(listOf(name, "${y[0]}", "${y[1]}"))
}
}
return Collections.unmodifiableList(dataList)
}
}
......@@ -2,16 +2,14 @@ package theodolite.evaluation
import com.google.gson.Gson
import khttp.post
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
class ExternalSloChecker(
private val prometheusURL: String,
private val query: String,
private val externalSlopeURL: String,
private val threshold: Int,
private val offset: Duration,
private val warmup: Int
) :
SloChecker {
......@@ -19,10 +17,10 @@ class ExternalSloChecker(
private val RETRIES = 2
private val TIMEOUT = 60.0
override fun evaluate(start: Instant, end: Instant): Boolean {
private val logger = KotlinLogging.logger {}
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, query)
val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
......@@ -30,6 +28,7 @@ class ExternalSloChecker(
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) {
counter++
logger.error { "Could not reach external slope analysis" }
} else {
return result.text.toBoolean()
}
......
......@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values" }
logger.error { "Empty query result: $values between $start and $end for querry $query" }
throw NoSuchFieldException()
}
return parseValues(response)
......
package theodolite.evaluation
import theodolite.util.PrometheusResponse
import java.time.Instant
interface SloChecker {
fun evaluate(start: Instant, end: Instant): Boolean
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
}
......@@ -6,21 +6,15 @@ class SloCheckerFactory {
fun create(
slotype: String,
prometheusURL: String,
query: String,
externalSlopeURL: String,
threshold: Int,
offset: Duration,
warmup: Int
): SloChecker {
return when (slotype) {
"lag trend" -> ExternalSloChecker(
prometheusURL = prometheusURL,
query = query,
externalSlopeURL = externalSlopeURL,
threshold = threshold,
offset = offset,
warmup = warmup
)
else -> throw IllegalArgumentException("Slotype $slotype not found.")
......
......@@ -3,13 +3,12 @@ package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.SloCheckerFactory
import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
......@@ -24,26 +23,10 @@ class BenchmarkExecutorImpl(
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
benchmarkDeployment.setup()
this.waitAndLog()
benchmarkDeployment.teardown()
var result = false
try {
result = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
val result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration)
benchmarkDeployment.teardown()
this.results.setResult(Pair(load, res), result)
return result
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment