Skip to content
Snippets Groups Projects
Commit 951c4ef6 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

merge upstream theodolite-kotlin

parents 888c85a4 feffa5cf
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!138Load execution ID from file,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 86 additions and 65 deletions
...@@ -35,6 +35,8 @@ public final class ConfigurationKeys { ...@@ -35,6 +35,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -22,6 +22,7 @@ interface Benchmark { ...@@ -22,6 +22,7 @@ interface Benchmark {
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
delay: Long loadGenerationDelay: Long,
afterTeardownDelay: Long
): BenchmarkDeployment ): BenchmarkDeployment
} }
...@@ -48,6 +48,7 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -48,6 +48,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var repetitions by Delegates.notNull<Int>() var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String> lateinit var restrictions: List<String>
var loadGenerationDelay = 0L var loadGenerationDelay = 0L
var afterTeardownDelay = 5L
} }
/** /**
... ...
......
...@@ -71,7 +71,8 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -71,7 +71,8 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long loadGenerationDelay: Long,
afterTeardownDelay: Long
): BenchmarkDeployment { ): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." } logger.info { "Using $path as resource path." }
...@@ -100,6 +101,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -100,6 +101,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
appResources = appResources.map { it.second }, appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second }, loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay, loadGenerationDelay = loadGenerationDelay,
afterTeardownDelay = afterTeardownDelay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics, topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace) client = DefaultKubernetesClient().inNamespace(namespace)
... ...
......
...@@ -26,6 +26,7 @@ class KubernetesBenchmarkDeployment( ...@@ -26,6 +26,7 @@ class KubernetesBenchmarkDeployment(
val appResources: List<KubernetesResource>, val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>, val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long, private val loadGenerationDelay: Long,
private val afterTeardownDelay: Long,
private val kafkaConfig: HashMap<String, Any>, private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>, private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient private val client: NamespacedKubernetesClient
...@@ -33,7 +34,6 @@ class KubernetesBenchmarkDeployment( ...@@ -33,7 +34,6 @@ class KubernetesBenchmarkDeployment(
private val kafkaController = TopicManager(this.kafkaConfig) private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client) private val kubernetesManager = K8sManager(client)
private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter" private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val SLEEP_AFTER_TEARDOWN = 5000L
/** /**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]: * Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
...@@ -61,7 +61,7 @@ class KubernetesBenchmarkDeployment( ...@@ -61,7 +61,7 @@ class KubernetesBenchmarkDeployment(
appResources.forEach { kubernetesManager.remove(it) } appResources.forEach { kubernetesManager.remove(it) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." } logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." }
Thread.sleep(SLEEP_AFTER_TEARDOWN) Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis())
} }
} }
...@@ -35,23 +35,27 @@ class AnalysisExecutor( ...@@ -35,23 +35,27 @@ class AnalysisExecutor(
* @param executionDuration of the experiment. * @param executionDuration of the experiment.
* @return true if the experiment succeeded. * @return true if the experiment succeeded.
*/ */
fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean { fun analyze(load: LoadDimension, res: Resource, executionIntervals: List<Pair<Instant, Instant>>): Boolean {
var result = false var result = false
var repetitionCounter = 1
try { try {
val prometheusData = fetcher.fetchMetric(
start = Instant.now().minus(executionDuration),
end = Instant.now(),
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
)
val ioHandler = IOHandler() val ioHandler = IOHandler()
val resultsFolder: String = ioHandler.getResultFolderURL() val resultsFolder: String = ioHandler.getResultFolderURL()
val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}" val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}"
val prometheusData = executionIntervals
.map { interval -> fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)") }
prometheusData.forEach{ data ->
ioHandler.writeToCSVFile( ioHandler.writeToCSVFile(
fileURL = fileURL, fileURL = "${fileURL}_${repetitionCounter++}",
data = prometheusData.getResultAsList(), data = data.getResultAsList(),
columns = listOf("group", "timestamp", "value")) columns = listOf("group", "timestamp", "value"))
}
val sloChecker = SloCheckerFactory().create( val sloChecker = SloCheckerFactory().create(
sloType = slo.sloType, sloType = slo.sloType,
...@@ -60,10 +64,7 @@ class AnalysisExecutor( ...@@ -60,10 +64,7 @@ class AnalysisExecutor(
warmup = slo.warmup warmup = slo.warmup
) )
result = sloChecker.evaluate( result = sloChecker.evaluate(prometheusData)
start = Instant.now().minus(executionDuration),
end = Instant.now(), fetchedData = prometheusData
)
} catch (e: Exception) { } catch (e: Exception) {
logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" } logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" }
... ...
......
...@@ -35,10 +35,10 @@ class ExternalSloChecker( ...@@ -35,10 +35,10 @@ class ExternalSloChecker(
* @return true if the experiment was successful(the threshold was not exceeded. * @return true if the experiment was successful(the threshold was not exceeded.
* @throws ConnectException if the external service could not be reached. * @throws ConnectException if the external service could not be reached.
*/ */
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean { override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0 var counter = 0
val data = Gson().toJson(mapOf( val data = Gson().toJson(mapOf(
"total_lag" to fetchedData.data?.result, "total_lags" to fetchedData.map { it.data?.result},
"threshold" to threshold, "threshold" to threshold,
"warmup" to warmup)) "warmup" to warmup))
... ...
......
package theodolite.evaluation package theodolite.evaluation
import theodolite.util.PrometheusResponse import theodolite.util.PrometheusResponse
import java.time.Instant
/** /**
* A SloChecker can be used to evaluate data from Prometheus. * A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker * @constructor Creates an empty SloChecker
*/ */
interface SloChecker { interface SloChecker {
/** /**
* Evaluates [fetchedData] and returns if the experiment was successful. * Evaluates [fetchedData] and returns if the experiment was successful.
* Returns if the evaluated experiment was successful. * Returns if the evaluated experiment was successful.
...@@ -18,5 +16,5 @@ interface SloChecker { ...@@ -18,5 +16,5 @@ interface SloChecker {
* @param fetchedData from Prometheus that will be evaluated. * @param fetchedData from Prometheus that will be evaluated.
* @return true if experiment was successful. Otherwise false. * @return true if experiment was successful. Otherwise false.
*/ */
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean fun evaluate(fetchedData: List<PrometheusResponse>): Boolean
} }
...@@ -26,8 +26,10 @@ abstract class BenchmarkExecutor( ...@@ -26,8 +26,10 @@ abstract class BenchmarkExecutor(
val executionDuration: Duration, val executionDuration: Duration,
val configurationOverrides: List<ConfigurationOverride?>, val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo, val slo: BenchmarkExecution.Slo,
val repetitions: Int,
val executionId: Int, val executionId: Int,
val loadGenerationDelay: Long val loadGenerationDelay: Long,
val afterTeardownDelay: Long
) { ) {
var run: AtomicBoolean = AtomicBoolean(true) var run: AtomicBoolean = AtomicBoolean(true)
... ...
......
...@@ -5,11 +5,9 @@ import mu.KotlinLogging ...@@ -5,11 +5,9 @@ import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor import theodolite.evaluation.AnalysisExecutor
import theodolite.util.ConfigurationOverride import theodolite.util.*
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration import java.time.Duration
import java.time.Instant
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -20,41 +18,56 @@ class BenchmarkExecutorImpl( ...@@ -20,41 +18,56 @@ class BenchmarkExecutorImpl(
executionDuration: Duration, executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
repetitions: Int,
executionId: Int, executionId: Int,
loadGenerationDelay: Long loadGenerationDelay: Long,
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay) { afterTeardownDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, repetitions, executionId, loadGenerationDelay, afterTeardownDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay) val executionIntervals: MutableList<Pair<Instant, Instant>> = ArrayList()
try { for (i in 1.rangeTo(repetitions)) {
benchmarkDeployment.setup() logger.info { "Run repetition $i/$repetitions" }
this.waitAndLog() if (this.run.get()) {
} catch (e: Exception) { executionIntervals.add(runSingleExperiment(load,res))
logger.error { "Error while setting up experiment with id ${this.executionId}." } } else {
logger.error { "Error is: $e" } break
this.run.set(false) }
} }
/** /**
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user. * Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/ */
if (this.run.get()) { if (this.run.get()) {
result = AnalysisExecutor(slo = slo, executionId = executionId).analyze( result =AnalysisExecutor(slo = slo, executionId = executionId)
.analyze(
load = load, load = load,
res = res, res = res,
executionDuration = executionDuration executionIntervals = executionIntervals)
)
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
} }
return result
}
private fun runSingleExperiment(load: LoadDimension, res: Resource): Pair<Instant, Instant> {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides, this.loadGenerationDelay, this.afterTeardownDelay)
val from = Instant.now()
try {
benchmarkDeployment.setup()
this.waitAndLog()
} catch (e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" }
this.run.set(false)
}
val to = Instant.now()
try { try {
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
} catch (e: Exception) { } catch (e: Exception) {
logger.warn { "Error while tearing down the benchmark deployment." } logger.warn { "Error while tearing down the benchmark deployment." }
logger.debug { "Teardown failed, caused by: $e" } logger.debug { "Teardown failed, caused by: $e" }
} }
return Pair(from,to)
return result
} }
} }
...@@ -31,7 +31,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -31,7 +31,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
load = LoadDimension(0, emptyList()), load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()), res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides, configurationOverrides = benchmarkExecution.configOverrides,
loadGenerationDelay = 0L loadGenerationDelay = 0L,
afterTeardownDelay = 5L
) )
deployment.teardown() deployment.teardown()
} catch (e: Exception) { } catch (e: Exception) {
... ...
......
package theodolite.execution package theodolite.execution
import com.google.gson.GsonBuilder
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
...@@ -9,11 +8,6 @@ import theodolite.strategies.StrategyFactory ...@@ -9,11 +8,6 @@ import theodolite.strategies.StrategyFactory
import theodolite.strategies.searchstrategy.CompositeStrategy import theodolite.strategies.searchstrategy.CompositeStrategy
import theodolite.util.* import theodolite.util.*
import java.io.File import java.io.File
import java.io.PrintWriter
import java.lang.IllegalArgumentException
import java.lang.Thread.sleep
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration import java.time.Duration
...@@ -68,8 +62,10 @@ class TheodoliteExecutor( ...@@ -68,8 +62,10 @@ class TheodoliteExecutor(
executionDuration = executionDuration, executionDuration = executionDuration,
configurationOverrides = config.configOverrides, configurationOverrides = config.configOverrides,
slo = config.slos[0], slo = config.slos[0],
repetitions = config.execution.repetitions,
executionId = config.executionId, executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay loadGenerationDelay = config.execution.loadGenerationDelay,
afterTeardownDelay = config.execution.afterTeardownDelay
) )
return Config( return Config(
... ...
......
...@@ -31,8 +31,8 @@ class TheodoliteYamlExecutor { ...@@ -31,8 +31,8 @@ class TheodoliteYamlExecutor {
fun start() { fun start() {
logger.info { "Theodolite started" } logger.info { "Theodolite started" }
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml" val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/example-execution-yaml-resource.yaml"
val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "./config/BenchmarkType.yaml" val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "./config/example-benchmark-yaml-resource.yaml"
logger.info { "Using $executionPath for BenchmarkExecution" } logger.info { "Using $executionPath for BenchmarkExecution" }
logger.info { "Using $benchmarkPath for BenchmarkType" } logger.info { "Using $benchmarkPath for BenchmarkType" }
... ...
......
...@@ -31,7 +31,7 @@ class CompositeStrategyTest { ...@@ -31,7 +31,7 @@ class CompositeStrategyTest {
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0) val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 5)
val linearSearch = LinearSearch(benchmarkExecutor) val linearSearch = LinearSearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -65,7 +65,7 @@ class CompositeStrategyTest { ...@@ -65,7 +65,7 @@ class CompositeStrategyTest {
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutorImpl = val benchmarkExecutorImpl =
TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0) TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0, 0)
val binarySearch = BinarySearch(benchmarkExecutorImpl) val binarySearch = BinarySearch(benchmarkExecutorImpl)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
...@@ -98,7 +98,7 @@ class CompositeStrategyTest { ...@@ -98,7 +98,7 @@ class CompositeStrategyTest {
val results = Results() val results = Results()
val benchmark = TestBenchmark() val benchmark = TestBenchmark()
val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo() val sloChecker: BenchmarkExecution.Slo = BenchmarkExecution.Slo()
val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0) val benchmarkExecutor = TestBenchmarkExecutorImpl(mockResults, benchmark, results, sloChecker, 0, 0,0)
val binarySearch = BinarySearch(benchmarkExecutor) val binarySearch = BinarySearch(benchmarkExecutor)
val lowerBoundRestriction = LowerBoundRestriction(results) val lowerBoundRestriction = LowerBoundRestriction(results)
val strategy = val strategy =
... ...
......
...@@ -12,7 +12,8 @@ class TestBenchmark : Benchmark { ...@@ -12,7 +12,8 @@ class TestBenchmark : Benchmark {
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long loadGenerationDelay: Long,
afterTeardownDelay: Long
): BenchmarkDeployment { ): BenchmarkDeployment {
return TestBenchmarkDeployment() return TestBenchmarkDeployment()
} }
... ...
......
...@@ -14,7 +14,8 @@ class TestBenchmarkExecutorImpl( ...@@ -14,7 +14,8 @@ class TestBenchmarkExecutorImpl(
results: Results, results: Results,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
executionId: Int, executionId: Int,
loadGenerationDelay: Long loadGenerationDelay: Long,
afterTeardownDelay: Long
) : ) :
BenchmarkExecutor( BenchmarkExecutor(
benchmark, benchmark,
...@@ -22,8 +23,10 @@ class TestBenchmarkExecutorImpl( ...@@ -22,8 +23,10 @@ class TestBenchmarkExecutorImpl(
executionDuration = Duration.ofSeconds(1), executionDuration = Duration.ofSeconds(1),
configurationOverrides = emptyList(), configurationOverrides = emptyList(),
slo = slo, slo = slo,
repetitions = 1,
executionId = executionId, executionId = executionId,
loadGenerationDelay = loadGenerationDelay loadGenerationDelay = loadGenerationDelay,
afterTeardownDelay = afterTeardownDelay
) { ) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment