Skip to content
Snippets Groups Projects
Commit 4a2d4763 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Reworked Analysis: Added AnalysisExecutor

parent 2bea40b2
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!105Introduce CSV exporter,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.LoadDimension
import theodolite.util.PrometheusResponse
import theodolite.util.Resource
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
/**
* Executes the Analysis.
*/
class AnalysisExecutor {
fun analyse(load:LoadDimension,executionDuration: Duration, res: Resource,slo: BenchmarkExecution.Slo): Boolean {
var result = false
try {
val prometheusData = fetch(Instant.now().minus(executionDuration),
Instant.now(),
slo,
"sum by(group)(kafka_consumergroup_group_lag >= 0)")
CsvExporter().toCsv("${load.get()}_${res.get()}_${slo.sloType}",prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
result = sloChecker.evaluate(start = Instant.now().minus(executionDuration),
end = Instant.now(),fetchedData = prometheusData)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
return result
}
fun fetch(start: Instant, end: Instant,slo: BenchmarkExecution.Slo,query: String): PrometheusResponse {
val metricFetcher = MetricFetcher(prometheusURL = slo.prometheusUrl,
offset = Duration.ofHours(slo.offset.toLong()))
val fetchedData = metricFetcher.fetchMetric(start, end, query)
return fetchedData
}
}
package theodolite.evaluation package theodolite.evaluation
import mu.KotlinLogging
import theodolite.util.PrometheusResponse import theodolite.util.PrometheusResponse
import java.io.File import java.io.File
import java.io.PrintWriter import java.io.PrintWriter
private val logger = KotlinLogging.logger {}
class CsvExporter { class CsvExporter {
/** /**
* Uses the PrintWriter to transform a PrometheusResponse to Csv * Uses the PrintWriter to transform a PrometheusResponse to Csv
*/ */
fun toCsv(name : String,prom: PrometheusResponse){ fun toCsv(name : String,prom: PrometheusResponse){
val x = toArray(prom) val responseArray = toArray(prom)
val csvOutputFile: File = File(name+".csv") val csvOutputFile: File = File("$name.csv")
PrintWriter(csvOutputFile).use { pw -> PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("name","time","value").joinToString()) pw.println(listOf("name","time","value").joinToString())
x.forEach{ responseArray.forEach{
pw.println(it.joinToString()) pw.println(it.joinToString())
} }
} }
logger.debug{csvOutputFile.absolutePath}
logger.info { "Wrote csv to $name" }
} }
/** /**
* Converts a PrometheusResponse into a List of List of Strings * Converts a PrometheusResponse into a List of List of Strings
*/ */
private fun toArray(prom : PrometheusResponse): MutableList<List<String>> { private fun toArray(prom : PrometheusResponse): MutableList<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString() val name = prom.data?.result?.get(0)?.metric?.group.toString()
val values = prom.data?.result?.get(0)?.values val values = prom.data?.result?.get(0)?.values
val dataList = mutableListOf<List<String>>() val dataList = mutableListOf<List<String>>()
...@@ -33,11 +37,9 @@ class CsvExporter { ...@@ -33,11 +37,9 @@ class CsvExporter {
if (values != null) { if (values != null) {
for (x in values){ for (x in values){
val y = x as List<*> val y = x as List<*>
dataList.add(listOf(name,"${y[0]}","${y[1]}")) dataList.add(listOf(name,"${y[0]}","${y[1]}"))
} }
} }
return dataList return dataList
} }
} }
...@@ -2,6 +2,8 @@ package theodolite.evaluation ...@@ -2,6 +2,8 @@ package theodolite.evaluation
import com.google.gson.Gson import com.google.gson.Gson
import khttp.post import khttp.post
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException import java.net.ConnectException
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
...@@ -19,10 +21,10 @@ class ExternalSloChecker( ...@@ -19,10 +21,10 @@ class ExternalSloChecker(
private val RETRIES = 2 private val RETRIES = 2
private val TIMEOUT = 60.0 private val TIMEOUT = 60.0
override fun evaluate(start: Instant, end: Instant): Boolean { private val logger = KotlinLogging.logger {}
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0 var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, query)
val data = val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup)) Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
...@@ -30,6 +32,7 @@ class ExternalSloChecker( ...@@ -30,6 +32,7 @@ class ExternalSloChecker(
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) { if (result.statusCode != 200) {
counter++ counter++
logger.error{"Could not reach external slope analysis"}
} else { } else {
return result.text.toBoolean() return result.text.toBoolean()
} }
......
...@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat ...@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
} else { } else {
val values = parseValues(response) val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) { if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values" } logger.error { "Empty query result: $values between $start and $end for querry $query" }
throw NoSuchFieldException() throw NoSuchFieldException()
} }
return parseValues(response) return parseValues(response)
......
package theodolite.evaluation package theodolite.evaluation
import theodolite.util.PrometheusResponse
import java.time.Instant import java.time.Instant
interface SloChecker { interface SloChecker {
fun evaluate(start: Instant, end: Instant): Boolean fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
} }
...@@ -3,6 +3,7 @@ package theodolite.execution ...@@ -3,6 +3,7 @@ package theodolite.execution
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor
import theodolite.evaluation.SloCheckerFactory import theodolite.evaluation.SloCheckerFactory
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
...@@ -20,30 +21,12 @@ class BenchmarkExecutorImpl( ...@@ -20,30 +21,12 @@ class BenchmarkExecutorImpl(
private val configurationOverrides: List<ConfigurationOverride>, private val configurationOverrides: List<ConfigurationOverride>,
slo: BenchmarkExecution.Slo slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) { ) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
//TODO ADD SHUTDOWN HOOK HERE
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
var result = false var result = AnalysisExecutor().analyse(load,executionDuration,res,slo)
try {
result = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment