Skip to content
Snippets Groups Projects
Commit 66e266af authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

merge again theodolite kotlin

parents 5321cfd2 0d53d57d
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!108Feature/185 Make Paths Configurable,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 126 additions and 58 deletions
......@@ -10,6 +10,6 @@ interface Benchmark {
fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment
}
......@@ -12,7 +12,7 @@ class BenchmarkExecution {
lateinit var resources: ResourceDefinition
lateinit var slos: List<Slo>
lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride>
lateinit var configOverrides: List<ConfigurationOverride?>
@RegisterForReflection
class Execution {
......
......
......@@ -43,7 +43,7 @@ class KubernetesBenchmark : Benchmark {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment {
val resources = loadKubernetesResources(this.appResource + this.loadGenResource)
val patcherFactory = PatcherFactory()
......@@ -53,7 +53,7 @@ class KubernetesBenchmark : Benchmark {
res.getType().forEach{ patcherDefinition -> patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString()) }
// Patch the given overrides
configurationOverrides.forEach { override -> patcherFactory.createPatcher(override.patcher, resources).patch(override.value) }
configurationOverrides.forEach { override -> override?.let { patcherFactory.createPatcher(it.patcher, resources).patch(override.value) } }
return KubernetesBenchmarkDeployment(
......
......
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.LoadDimension
import theodolite.util.Resource
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
private val fetcher = MetricFetcher(
prometheusURL = slo.prometheusUrl,
offset = Duration.ofHours(slo.offset.toLong())
)
fun analyse(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false
try {
val prometheusData = fetcher.fetchMetric(
start = Instant.now().minus(executionDuration),
end = Instant.now(),
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
CsvExporter().toCsv(name = "${load.get()}_${res.get()}_${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
warmup = slo.warmup
)
result = sloChecker.evaluate(
start = Instant.now().minus(executionDuration),
end = Instant.now(), fetchedData = prometheusData
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
return result
}
}
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.io.File
import java.io.PrintWriter
import java.util.*
private val logger = KotlinLogging.logger {}
class CsvExporter {
......@@ -10,22 +14,22 @@ class CsvExporter {
* Uses the PrintWriter to transform a PrometheusResponse to Csv
*/
fun toCsv(name: String, prom: PrometheusResponse) {
val x = toArray(prom)
val csvOutputFile: File = File(name+".csv")
val responseArray = promResponseToList(prom)
val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("name", "time", "value").joinToString())
x.forEach{
responseArray.forEach {
pw.println(it.joinToString())
}
}
logger.info { "Wrote csv file: $name to ${csvOutputFile.absolutePath}" }
}
/**
* Converts a PrometheusResponse into a List of List of Strings
*/
private fun toArray(prom : PrometheusResponse): MutableList<List<String>> {
private fun promResponseToList(prom: PrometheusResponse): List<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString()
val values = prom.data?.result?.get(0)?.values
val dataList = mutableListOf<List<String>>()
......@@ -33,11 +37,9 @@ class CsvExporter {
if (values != null) {
for (x in values) {
val y = x as List<*>
dataList.add(listOf(name, "${y[0]}", "${y[1]}"))
}
}
return dataList
return Collections.unmodifiableList(dataList)
}
}
......@@ -2,16 +2,14 @@ package theodolite.evaluation
import com.google.gson.Gson
import khttp.post
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
class ExternalSloChecker(
private val prometheusURL: String,
private val query: String,
private val externalSlopeURL: String,
private val threshold: Int,
private val offset: Duration,
private val warmup: Int
) :
SloChecker {
......@@ -19,10 +17,10 @@ class ExternalSloChecker(
private val RETRIES = 2
private val TIMEOUT = 60.0
override fun evaluate(start: Instant, end: Instant): Boolean {
private val logger = KotlinLogging.logger {}
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, query)
val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
......@@ -30,6 +28,7 @@ class ExternalSloChecker(
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) {
counter++
logger.error { "Could not reach external slope analysis" }
} else {
return result.text.toBoolean()
}
......
......
......@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values" }
logger.error { "Empty query result: $values between $start and $end for querry $query" }
throw NoSuchFieldException()
}
return parseValues(response)
......
......
package theodolite.evaluation
import theodolite.util.PrometheusResponse
import java.time.Instant
interface SloChecker {
fun evaluate(start: Instant, end: Instant): Boolean
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
}
......@@ -6,21 +6,15 @@ class SloCheckerFactory {
fun create(
slotype: String,
prometheusURL: String,
query: String,
externalSlopeURL: String,
threshold: Int,
offset: Duration,
warmup: Int
): SloChecker {
return when (slotype) {
"lag trend" -> ExternalSloChecker(
prometheusURL = prometheusURL,
query = query,
externalSlopeURL = externalSlopeURL,
threshold = threshold,
offset = offset,
warmup = warmup
)
else -> throw IllegalArgumentException("Slotype $slotype not found.")
......
......
......@@ -23,7 +23,7 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark,
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride>,
configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo
) {
......
......
......@@ -4,13 +4,12 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.SloCheckerFactory
import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
......@@ -19,33 +18,17 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride>,
private val configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
//TODO ADD SHUTDOWN HOOK HERE
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
benchmarkDeployment.setup()
this.waitAndLog()
var result = false
try {
result = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
val result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration)
benchmarkDeployment.teardown()
benchmarkDeployment.teardown()
......
......
package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
private val logger = KotlinLogging.logger {}
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) :
Thread() {
override fun run() {
// Build Configuration to teardown
logger.info { "Received shutdown signal -> Shutting down" }
val deployment =
benchmark.buildDeployment(
load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides
)
logger.info { "Teardown the everything deployed" }
deployment.teardown()
logger.info { "Teardown completed" }
}
}
......@@ -21,8 +21,13 @@ class TheodoliteExecutor(
val strategyFactory = StrategyFactory()
val executionDuration = Duration.ofSeconds(config.execution.duration)
val resourcePatcherDefinition = PatcherDefinitionFactory().createPatcherDefinition(config.resources.resourceType, this.kubernetesBenchmark.resourceTypes)
val loadDimensionPatcherDefinition = PatcherDefinitionFactory().createPatcherDefinition(config.load.loadType, this.kubernetesBenchmark.loadTypes)
val resourcePatcherDefinition = PatcherDefinitionFactory().createPatcherDefinition(
config.resources.resourceType,
this.kubernetesBenchmark.resourceTypes
)
val loadDimensionPatcherDefinition =
PatcherDefinitionFactory().createPatcherDefinition(config.load.loadType, this.kubernetesBenchmark.loadTypes)
val executor =
BenchmarkExecutorImpl(
......@@ -35,7 +40,12 @@ class TheodoliteExecutor(
return Config(
loads = config.load.loadValues.map { load -> LoadDimension(load, loadDimensionPatcherDefinition) },
resources = config.resources.resourceValues.map { resource -> Resource(resource, resourcePatcherDefinition) },
resources = config.resources.resourceValues.map { resource ->
Resource(
resource,
resourcePatcherDefinition
)
},
compositeStrategy = CompositeStrategy(
benchmarkExecutor = executor,
searchStrategy = strategyFactory.createSearchStrategy(executor, config.execution.strategy),
......
......
......@@ -32,9 +32,13 @@ object TheodoliteYamlExecutor {
parser.parse(path = benchmarkPath, E = KubernetesBenchmark::class.java)!!
benchmark.path = appResource
val shutdown = Shutdown(benchmarkExecution, benchmark)
Runtime.getRuntime().addShutdownHook(shutdown)
val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run()
logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(shutdown)
exitProcess(0)
}
}
......@@ -11,7 +11,7 @@ class TestBenchmark : Benchmark {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment {
return TestBenchmarkDeployment()
}
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment