diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt new file mode 100644 index 0000000000000000000000000000000000000000..2910d84991c2c37051b4b053c0c024344c0b3ff0 --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt @@ -0,0 +1,48 @@ +package theodolite.evaluation + +import mu.KotlinLogging +import theodolite.benchmark.BenchmarkExecution +import theodolite.util.LoadDimension +import theodolite.util.Resource +import java.time.Duration +import java.time.Instant + +private val logger = KotlinLogging.logger {} + +class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) { + + private val fetcher = MetricFetcher( + prometheusURL = slo.prometheusUrl, + offset = Duration.ofHours(slo.offset.toLong()) + ) + + fun analyse(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean { + var result = false + + try { + val prometheusData = fetcher.fetchMetric( + start = Instant.now().minus(executionDuration), + end = Instant.now(), + query = "sum by(group)(kafka_consumergroup_group_lag >= 0)" + ) + + CsvExporter().toCsv(name = "${load.get()}_${res.get()}_${slo.sloType}", prom = prometheusData) + + val sloChecker = SloCheckerFactory().create( + slotype = slo.sloType, + externalSlopeURL = slo.externalSloUrl, + threshold = slo.threshold, + warmup = slo.warmup + ) + + result = sloChecker.evaluate( + start = Instant.now().minus(executionDuration), + end = Instant.now(), fetchedData = prometheusData + ) + + } catch (e: Exception) { + logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" } + } + return result + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt new file mode 100644 index 0000000000000000000000000000000000000000..e2f536af6dba838b2b4027d6cfafb032ebd3d04d --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt @@ -0,0 +1,45 @@ +package theodolite.evaluation + +import mu.KotlinLogging +import theodolite.util.PrometheusResponse +import java.io.File +import java.io.PrintWriter +import java.util.* + +private val logger = KotlinLogging.logger {} + +class CsvExporter { + + /** + * Uses the PrintWriter to transform a PrometheusResponse to Csv + */ + fun toCsv(name: String, prom: PrometheusResponse) { + val responseArray = promResponseToList(prom) + val csvOutputFile = File("$name.csv") + + PrintWriter(csvOutputFile).use { pw -> + pw.println(listOf("name", "time", "value").joinToString()) + responseArray.forEach { + pw.println(it.joinToString()) + } + } + logger.info { "Wrote csv file: $name to ${csvOutputFile.absolutePath}" } + } + + /** + * Converts a PrometheusResponse into a List of List of Strings + */ + private fun promResponseToList(prom: PrometheusResponse): List<List<String>> { + val name = prom.data?.result?.get(0)?.metric?.group.toString() + val values = prom.data?.result?.get(0)?.values + val dataList = mutableListOf<List<String>>() + + if (values != null) { + for (x in values) { + val y = x as List<*> + dataList.add(listOf(name, "${y[0]}", "${y[1]}")) + } + } + return Collections.unmodifiableList(dataList) + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt index 2de8e2dc9c03ec5449c9f04585622d6730644aa2..e65116c0a6b562c0e05714d09ab5a9b528249a05 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt @@ -2,16 +2,14 @@ package theodolite.evaluation import com.google.gson.Gson import khttp.post +import mu.KotlinLogging +import theodolite.util.PrometheusResponse import java.net.ConnectException -import java.time.Duration import java.time.Instant class ExternalSloChecker( - private val prometheusURL: String, - private val query: String, private val externalSlopeURL: String, private val threshold: Int, - private val offset: Duration, private val warmup: Int ) : SloChecker { @@ -19,10 +17,10 @@ class ExternalSloChecker( private val RETRIES = 2 private val TIMEOUT = 60.0 - override fun evaluate(start: Instant, end: Instant): Boolean { + private val logger = KotlinLogging.logger {} + + override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean { var counter = 0 - val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset) - val fetchedData = metricFetcher.fetchMetric(start, end, query) val data = Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup)) @@ -30,6 +28,7 @@ class ExternalSloChecker( val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) if (result.statusCode != 200) { counter++ + logger.error { "Could not reach external slope analysis" } } else { return result.text.toBoolean() } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt index 7dbaf568c3452e7ae565002ae00e5314502f8930..19a8bbe9ba0bdd8a694eb37b9db42de6fdf3d620 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat } else { val values = parseValues(response) if (values.data?.result.isNullOrEmpty()) { - logger.error { "Empty query result: $values" } + logger.error { "Empty query result: $values between $start and $end for querry $query" } throw NoSuchFieldException() } return parseValues(response) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt index 53ed1b7fa02681f97b121f93d690c0654f961a94..66ea1d201f7b48e09c3acb4365436caae637e6fa 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt @@ -1,7 +1,8 @@ package theodolite.evaluation +import theodolite.util.PrometheusResponse import java.time.Instant interface SloChecker { - fun evaluate(start: Instant, end: Instant): Boolean + fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloCheckerFactory.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloCheckerFactory.kt index 2170ef7b6abdb74499d05ac623c7892ac36b72d9..50b7b0aec3c5d48146d4f9423b06fe62f55e3c56 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloCheckerFactory.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloCheckerFactory.kt @@ -6,21 +6,15 @@ class SloCheckerFactory { fun create( slotype: String, - prometheusURL: String, - query: String, externalSlopeURL: String, threshold: Int, - offset: Duration, warmup: Int ): SloChecker { return when (slotype) { "lag trend" -> ExternalSloChecker( - prometheusURL = prometheusURL, - query = query, externalSlopeURL = externalSlopeURL, threshold = threshold, - offset = offset, warmup = warmup ) else -> throw IllegalArgumentException("Slotype $slotype not found.") diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index ddfaf8e9b4a97a8185cc3f086bbc773776d4e38b..25ec1bf5fcd49bd28269b29b1a70ab966b3ac65a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -3,13 +3,12 @@ package theodolite.execution import mu.KotlinLogging import theodolite.benchmark.Benchmark import theodolite.benchmark.BenchmarkExecution -import theodolite.evaluation.SloCheckerFactory +import theodolite.evaluation.AnalysisExecutor import theodolite.util.ConfigurationOverride import theodolite.util.LoadDimension import theodolite.util.Resource import theodolite.util.Results import java.time.Duration -import java.time.Instant private val logger = KotlinLogging.logger {} @@ -24,26 +23,10 @@ class BenchmarkExecutorImpl( val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) benchmarkDeployment.setup() this.waitAndLog() - benchmarkDeployment.teardown() - var result = false - try { - result = SloCheckerFactory().create( - slotype = slo.sloType, - prometheusURL = slo.prometheusUrl, - query = "sum by(group)(kafka_consumergroup_group_lag >= 0)", - externalSlopeURL = slo.externalSloUrl, - threshold = slo.threshold, - offset = Duration.ofHours(slo.offset.toLong()), - warmup = slo.warmup - ) - .evaluate( - Instant.now().minus(executionDuration), - Instant.now() - ) - } catch (e: Exception) { - logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" } - } + val result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration) + + benchmarkDeployment.teardown() this.results.setResult(Pair(load, res), result) return result