Skip to content
Snippets Groups Projects
Commit cd3f1b54 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Merge upstream theodolite-kotlin

parents 2a4546a2 0d53d57d
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!106Introduce a Theodolite operator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 136 additions and 45 deletions
......@@ -8,6 +8,6 @@ interface Benchmark {
fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment
}
......@@ -14,7 +14,7 @@ class BenchmarkExecution : CustomResource(){
lateinit var resources: ResourceDefinition
lateinit var slos: List<Slo>
lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride>
lateinit var configOverrides: List<ConfigurationOverride?>
@JsonDeserialize
class Execution : KubernetesResource {
......
......@@ -42,7 +42,7 @@ class KubernetesBenchmark : Benchmark , CustomResource() {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment {
val resources = loadKubernetesResources(this.appResource + this.loadGenResource)
val patcherFactory = PatcherFactory()
......@@ -52,7 +52,7 @@ class KubernetesBenchmark : Benchmark , CustomResource() {
res.getType().forEach{ patcherDefinition -> patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString()) }
// Patch the given overrides
configurationOverrides.forEach { override -> patcherFactory.createPatcher(override.patcher, resources).patch(override.value) }
configurationOverrides.forEach { override -> override?.let { patcherFactory.createPatcher(it.patcher, resources).patch(override.value) } }
return KubernetesBenchmarkDeployment(
......
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.util.LoadDimension
import theodolite.util.Resource
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
private val fetcher = MetricFetcher(
prometheusURL = slo.prometheusUrl,
offset = Duration.ofHours(slo.offset.toLong())
)
fun analyse(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false
try {
val prometheusData = fetcher.fetchMetric(
start = Instant.now().minus(executionDuration),
end = Instant.now(),
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
CsvExporter().toCsv(name = "${load.get()}_${res.get()}_${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
warmup = slo.warmup
)
result = sloChecker.evaluate(
start = Instant.now().minus(executionDuration),
end = Instant.now(), fetchedData = prometheusData
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
return result
}
}
package theodolite.evaluation
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.io.File
import java.io.PrintWriter
import java.util.*
private val logger = KotlinLogging.logger {}
class CsvExporter {
/**
* Uses the PrintWriter to transform a PrometheusResponse to Csv
*/
fun toCsv(name: String, prom: PrometheusResponse) {
val responseArray = promResponseToList(prom)
val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("name", "time", "value").joinToString())
responseArray.forEach {
pw.println(it.joinToString())
}
}
logger.info { "Wrote csv file: $name to ${csvOutputFile.absolutePath}" }
}
/**
* Converts a PrometheusResponse into a List of List of Strings
*/
private fun promResponseToList(prom: PrometheusResponse): List<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString()
val values = prom.data?.result?.get(0)?.values
val dataList = mutableListOf<List<String>>()
if (values != null) {
for (x in values) {
val y = x as List<*>
dataList.add(listOf(name, "${y[0]}", "${y[1]}"))
}
}
return Collections.unmodifiableList(dataList)
}
}
......@@ -2,16 +2,14 @@ package theodolite.evaluation
import com.google.gson.Gson
import khttp.post
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Duration
import java.time.Instant
class ExternalSloChecker(
private val prometheusURL: String,
private val query: String,
private val externalSlopeURL: String,
private val threshold: Int,
private val offset: Duration,
private val warmup: Int
) :
SloChecker {
......@@ -19,10 +17,10 @@ class ExternalSloChecker(
private val RETRIES = 2
private val TIMEOUT = 60.0
override fun evaluate(start: Instant, end: Instant): Boolean {
private val logger = KotlinLogging.logger {}
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0
val metricFetcher = MetricFetcher(prometheusURL = prometheusURL, offset = offset)
val fetchedData = metricFetcher.fetchMetric(start, end, query)
val data =
Gson().toJson(mapOf("total_lag" to fetchedData.data?.result, "threshold" to threshold, "warmup" to warmup))
......@@ -30,6 +28,7 @@ class ExternalSloChecker(
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) {
counter++
logger.error { "Could not reach external slope analysis" }
} else {
return result.text.toBoolean()
}
......
......@@ -37,7 +37,7 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
} else {
val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) {
logger.error { "Empty query result: $values" }
logger.error { "Empty query result: $values between $start and $end for querry $query" }
throw NoSuchFieldException()
}
return parseValues(response)
......
package theodolite.evaluation
import theodolite.util.PrometheusResponse
import java.time.Instant
interface SloChecker {
fun evaluate(start: Instant, end: Instant): Boolean
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
}
......@@ -6,21 +6,15 @@ class SloCheckerFactory {
fun create(
slotype: String,
prometheusURL: String,
query: String,
externalSlopeURL: String,
threshold: Int,
offset: Duration,
warmup: Int
): SloChecker {
return when (slotype) {
"lag trend" -> ExternalSloChecker(
prometheusURL = prometheusURL,
query = query,
externalSlopeURL = externalSlopeURL,
threshold = threshold,
offset = offset,
warmup = warmup
)
else -> throw IllegalArgumentException("Slotype $slotype not found.")
......
......@@ -23,7 +23,7 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark,
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride>,
configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo
) {
......
......@@ -3,13 +3,12 @@ package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.SloCheckerFactory
import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {}
......@@ -17,33 +16,17 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride>,
private val configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
benchmarkDeployment.setup()
this.waitAndLog()
benchmarkDeployment.teardown()
var result = false
try {
result = SloCheckerFactory().create(
slotype = slo.sloType,
prometheusURL = slo.prometheusUrl,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)",
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
offset = Duration.ofHours(slo.offset.toLong()),
warmup = slo.warmup
)
.evaluate(
Instant.now().minus(executionDuration),
Instant.now()
)
} catch (e: Exception) {
logger.error { "Evaluation failed for resource: ${res.get()} and load: ${load.get()} error: $e" }
}
val result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration)
benchmarkDeployment.teardown()
this.results.setResult(Pair(load, res), result)
return result
......
......@@ -22,11 +22,13 @@ object TheodoliteYamlExecutor {
val benchmark =
parser.parse("./../../../resources/main/yaml/BenchmarkType.yaml", KubernetesBenchmark::class.java)!!
Runtime.getRuntime().addShutdownHook(Shutdown(benchmarkExecution, benchmark))
val shutdown = Shutdown(benchmarkExecution, benchmark)
Runtime.getRuntime().addShutdownHook(shutdown)
val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run()
logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(shutdown)
exitProcess(0)
}
}
......@@ -4,11 +4,26 @@ import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
/**
* Fabric8 handles custom resources as plain HashMaps. These need to be handled differently than normal
* Kubernetes resources. The K8sCustomResourceWrapper class provides a wrapper to deploy and delete
* custom resources in a uniform way.
*
* @property map custom resource as plain hashmap
* @constructor Create empty K8s custom resource wrapper
*/
class K8sCustomResourceWrapper(private val map : Map<String,String>) : CustomResource() {
/**
* Deploy a custom resource
*
* @param client a namespaced Kubernetes client which are used to deploy the CR object.
*/
fun deploy(client : NamespacedKubernetesClient){
val kind = this.map["kind"]
// Search the CustomResourceDefinition to which the CR Object belongs.
// This should be exactly one if the CRD is registered for Kubernetes, zero otherwise.
val crds = client.apiextensions().v1beta1().customResourceDefinitions().list()
crds.items
.filter { crd -> crd.toString().contains("kind=$kind") }
......@@ -18,6 +33,11 @@ class K8sCustomResourceWrapper(private val map : Map<String,String>) : CustomRes
}
}
/**
* Delete a custom resource
*
* @param client a namespaced Kubernetes client which are used to delete the CR object.
*/
fun delete(client : NamespacedKubernetesClient){
val kind = this.map["kind"]
val metadata = this.map["metadata"] as HashMap<String,String>
......
......@@ -15,7 +15,6 @@ resources:
slos:
- sloType: "lag trend"
threshold: 1000
#prometheusUrl: "http://localhost:32656"
prometheusUrl: "http://prometheus-operated:9090"
externalSloUrl: "http://localhost:80/evaluate-slope"
offset: 0
......
......@@ -11,7 +11,7 @@ class TestBenchmark : Benchmark {
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride>
configurationOverrides: List<ConfigurationOverride?>
): BenchmarkDeployment {
return TestBenchmarkDeployment()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment