diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt new file mode 100644 index 0000000000000000000000000000000000000000..9d061d7ffb1d023d20587b91da8e9c83f6ffaebc --- /dev/null +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/AnalysisExecutor.kt @@ -0,0 +1,58 @@ +package theodolite.evaluation + +import mu.KotlinLogging +import theodolite.benchmark.BenchmarkExecution +import theodolite.util.LoadDimension +import theodolite.util.PrometheusResponse +import theodolite.util.Resource +import java.time.Duration +import java.time.Instant + +private val logger = KotlinLogging.logger {} + +/** + * Executes the Analysis. + */ +class AnalysisExecutor { + + fun analyse(load:LoadDimension,executionDuration: Duration, res: Resource,slo: BenchmarkExecution.Slo): Boolean { + var result = false + + try { + + val prometheusData = fetch(Instant.now().minus(executionDuration), + Instant.now(), + slo, + "sum by(group)(kafka_consumergroup_group_lag >= 0)") + + CsvExporter().toCsv("${load.get()}_${res.get()}_${slo.sloType}",prometheusData) + + val sloChecker = SloCheckerFactory().create( + slotype = slo.sloType, + prometheusURL = slo.prometheusUrl, + query = "sum by(group)(kafka_consumergroup_group_lag >= 0)", + externalSlopeURL = slo.externalSloUrl, + threshold = slo.threshold, + offset = Duration.ofHours(slo.offset.toLong()), + warmup = slo.warmup + ) + + result = sloChecker.evaluate(start = Instant.now().minus(executionDuration), + end = Instant.now(),fetchedData = prometheusData) + + } catch (e: Exception) { + logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" } + } + + return result + } + + fun fetch(start: Instant, end: Instant,slo: BenchmarkExecution.Slo,query: String): PrometheusResponse { + + val metricFetcher = MetricFetcher(prometheusURL = slo.prometheusUrl, + offset = Duration.ofHours(slo.offset.toLong())) + val fetchedData = metricFetcher.fetchMetric(start, end, query) + + return fetchedData + } +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt index 929a7914f5cf8501212f020bdc54cc19c43bcd7c..116bfdfd8dc357d9c93581eeef407ed8db8abf98 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/CsvExporter.kt @@ -1,31 +1,35 @@ package theodolite.evaluation +import mu.KotlinLogging import theodolite.util.PrometheusResponse import java.io.File import java.io.PrintWriter +private val logger = KotlinLogging.logger {} + class CsvExporter { /** * Uses the PrintWriter to transform a PrometheusResponse to Csv */ fun toCsv(name : String,prom: PrometheusResponse){ - val x = toArray(prom) - val csvOutputFile: File = File(name+".csv") + val responseArray = toArray(prom) + val csvOutputFile: File = File("$name.csv") PrintWriter(csvOutputFile).use { pw -> pw.println(listOf("name","time","value").joinToString()) - x.forEach{ + responseArray.forEach{ pw.println(it.joinToString()) } } + logger.debug{csvOutputFile.absolutePath} + logger.info { "Wrote csv to $name" } } /** * Converts a PrometheusResponse into a List of List of Strings */ private fun toArray(prom : PrometheusResponse): MutableList<List<String>> { - val name = prom.data?.result?.get(0)?.metric?.group.toString() val values = prom.data?.result?.get(0)?.values val dataList = mutableListOf<List<String>>() @@ -33,11 +37,9 @@ class CsvExporter { if (values != null) { for (x in values){ val y = x as List<*> - dataList.add(listOf(name,"${y[0]}","${y[1]}")) } } - return dataList } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt index 2de8e2dc9c03ec5449c9f04585622d6730644aa2..4cf417bc73f28d9b901cf528d0ad17e5c3135ffd 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt @@ -2,6 +2,8 @@ package theodolite.evaluation import com.google.gson.Gson import khttp.post +import mu.KotlinLogging +import theodolite.util.PrometheusResponse import java.net.ConnectException import java.time.Duration import java.time.Instant @@ -19,10 +21,10 @@ class ExternalSloChecker( private val RETRIES = 2 private val TIMEOUT = 60.0 - override fun evaluate(start: Instant, end: Instant): Boolean { + private val logger = KotlinLogging.logger {} + + override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean { var counter = 0 - val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset) - val fetchedData = metricFetcher.fetchMetric(start, end, query) val data = Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup)) @@ -30,6 +32,7 @@ class ExternalSloChecker( val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) if (result.statusCode != 200) { counter++ + logger.error{"Could not reach external slope analysis"} } else { return result.text.toBoolean() } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt index 7dbaf568c3452e7ae565002ae00e5314502f8930..19a8bbe9ba0bdd8a694eb37b9db42de6fdf3d620 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/MetricFetcher.kt @@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat } else { val values = parseValues(response) if (values.data?.result.isNullOrEmpty()) { - logger.error { "Empty query result: $values" } + logger.error { "Empty query result: $values between $start and $end for querry $query" } throw NoSuchFieldException() } return parseValues(response) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt index 53ed1b7fa02681f97b121f93d690c0654f961a94..66ea1d201f7b48e09c3acb4365436caae637e6fa 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/evaluation/SloChecker.kt @@ -1,7 +1,8 @@ package theodolite.evaluation +import theodolite.util.PrometheusResponse import java.time.Instant interface SloChecker { - fun evaluate(start: Instant, end: Instant): Boolean + fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 458ff108a1dbbece876146bdb019697cc8561121..0795e9893ebecaa8510cb35ab3de8f2bbb86ae7e 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -3,6 +3,7 @@ package theodolite.execution import mu.KotlinLogging import theodolite.benchmark.Benchmark import theodolite.benchmark.BenchmarkExecution +import theodolite.evaluation.AnalysisExecutor import theodolite.evaluation.SloCheckerFactory import theodolite.util.ConfigurationOverride import theodolite.util.LoadDimension @@ -20,30 +21,12 @@ class BenchmarkExecutorImpl( private val configurationOverrides: List<ConfigurationOverride>, slo: BenchmarkExecution.Slo ) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) { - //TODO ADD SHUTDOWN HOOK HERE override fun runExperiment(load: LoadDimension, res: Resource): Boolean { val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) benchmarkDeployment.setup() this.waitAndLog() - var result = false - try { - result = SloCheckerFactory().create( - slotype = slo.sloType, - prometheusURL = slo.prometheusUrl, - query = "sum by(group)(kafka_consumergroup_group_lag >= 0)", - externalSlopeURL = slo.externalSloUrl, - threshold = slo.threshold, - offset = Duration.ofHours(slo.offset.toLong()), - warmup = slo.warmup - ) - .evaluate( - Instant.now().minus(executionDuration), - Instant.now() - ) - } catch (e: Exception) { - logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" } - } + var result = AnalysisExecutor().analyse(load,executionDuration,res,slo) benchmarkDeployment.teardown()