Skip to content
Snippets Groups Projects
Commit 3d932eca authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge theodolite-kotlin

parents 37e7538d 55b77fdd
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!116Add image build documentation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 327 additions and 15 deletions
......@@ -86,7 +86,7 @@ docker run -i --rm theodolite-quarkus-native
**Production:** (Docker-Container)
| Variables name | If not set: |Usage |
| Variables name | Default value |Usage |
| -----------------------------|:----------------------------------:| ------------:|
| `NAMESPACE` | `default` |Determines the namespace of the Theodolite will be executed in. Used in the KubernetesBenchmark|
| `THEODOLITE_EXECUTION` | `./config/BenchmarkExecution.yaml`|The complete path to the benchmarkExecution file. Used in the TheodoliteYamlExecutor. |
......
......@@ -5,8 +5,19 @@ import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
/**
* A Benchmark contains:
* - The [Resource]s that can be scaled for the benchmark.
* - The [LoadDimension]s that can be scaled the benchmark.
* - additional [ConfigurationOverride]s.
*/
@RegisterForReflection
interface Benchmark {
/**
* Builds a Deployment that can be deployed.
* @return a BenchmarkDeployment.
*/
fun buildDeployment(
load: LoadDimension,
res: Resource,
......
package theodolite.benchmark
/**
* A BenchmarkDeployment contains the necessary infrastructure to execute a benchmark.
* Therefore it has the capabilities to set up the deployment of a benchmark and to tear it down.
*/
interface BenchmarkDeployment {
/**
* Setup a benchmark. This method is responsible for deploying the resources
* and organize the needed infrastructure.
*/
fun setup()
/**
* Tears down a benchmark. This method is responsible for deleting the deployed
* resources and to reset the used infrastructure.
*/
fun teardown()
}
......@@ -8,6 +8,22 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride
import kotlin.properties.Delegates
/**
* This class represents the configuration for an execution of a benchmark.
* An example for this is the BenchmarkExecution.yaml
* A BenchmarkExecution consists of:
* - A [name].
* - The [benchmark] that should be executed.
* - The [load] that should be checked in the benchmark.
* - The [resources] that should be checked in the benchmark.
* - A list of [slos] that are used for the evaluation of the experiments.
* - An [execution] that encapsulates: the strategy, the duration, and the restrictions
* for the execution of the benchmark.
* - [configOverrides] additional configurations.
* This class is used for parsing(in [theodolite.execution.TheodoliteYamlExecutor]) and
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty BenchmarkExecution.
*/
@JsonDeserialize
@RegisterForReflection
class BenchmarkExecution : CustomResource(), Namespaced {
......@@ -20,6 +36,10 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?>
/**
* This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions]
* which are used for the concrete benchmark experiments.
*/
@JsonDeserialize
@RegisterForReflection
class Execution : KubernetesResource {
......@@ -29,6 +49,15 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var restrictions: List<String>
}
/**
* Measurable metric.
* [sloType] determines the type of the metric.
* It is evaluated using the [theodolite.evaluation.ExternalSloChecker] by data measured by Prometheus.
* The evaluation checks if a [threshold] is reached or not.
* [offset] determines the shift in hours by which the start and end timestamps should be shifted.
* The [warmup] determines after which time the metric should be evaluated to avoid starting interferences.
* The [warmup] time unit depends on the Slo: for the lag trend it is in seconds.
*/
@JsonDeserialize
@RegisterForReflection
class Slo : KubernetesResource {
......@@ -40,7 +69,10 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var warmup by Delegates.notNull<Int>()
}
/**
* Represents a Load that should be created and checked.
* It can be set to [loadValues].
*/
@JsonDeserialize
@RegisterForReflection
class LoadDefinition : KubernetesResource {
......@@ -48,6 +80,9 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var loadValues: List<Int>
}
/**
* Represents a resource that can be scaled to [resourceValues].
*/
@JsonDeserialize
@RegisterForReflection
class ResourceDefinition : KubernetesResource {
......
......@@ -5,8 +5,16 @@ import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
/**
* Used to reset the KafkaLagExporter by deleting the pod.
* @param client NamespacedKubernetesClient used for the deletion.
*/
class KafkaLagExporterRemover(private val client: NamespacedKubernetesClient) {
/**
* Deletes all pods with the selected label.
* @param [label] of the pod that should be deleted.
*/
fun remove(label: String) {
this.client.pods().withLabel(label).delete()
logger.info { "Pod with label: $label deleted" }
......
......@@ -11,8 +11,25 @@ import theodolite.patcher.PatcherFactory
import theodolite.util.*
private val logger = KotlinLogging.logger {}
private var DEFAULT_NAMESPACE = "default"
/**
* Represents a benchmark in Kubernetes. An example for this is the BenchmarkType.yaml
* Contains a of:
* - [name] of the benchmark,
* - [appResource] list of the resources that have to be deployed for the benchmark,
* - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager],
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
* This class is used for the parsing(in the [theodolite.execution.TheodoliteYamlExecutor]) and
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty Benchmark.
*/
@RegisterForReflection
class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
lateinit var name: String
......@@ -24,6 +41,11 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
/**
* Loads [KubernetesResource]s.
* It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using
* the [K8sResourceLoader]
*/
private fun loadKubernetesResources(resources: List<String>): List<Pair<String, KubernetesResource>> {
val parser = YamlParser()
val loader = K8sResourceLoader(DefaultKubernetesClient().inNamespace(namespace))
......@@ -36,6 +58,15 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
}
}
/**
* Builds a deployment.
* First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment.
* @param configurationOverrides
* @return a [BenchmarkDeployment]
*/
override fun buildDeployment(
load: LoadDimension,
res: Resource,
......
......@@ -7,6 +7,14 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
/**
* Organizes the deployment of benchmarks in Kubernetes.
*
* @param namespace to operate in.
* @param resources List of [KubernetesResource] that are managed.
* @param kafkaConfig for the organization of Kafka topics.
* @param topics List of topics that are created or deleted.
*/
@RegisterForReflection
class KubernetesBenchmarkDeployment(
val namespace: String,
......@@ -19,6 +27,11 @@ class KubernetesBenchmarkDeployment(
private val kubernetesManager = K8sManager(client)
private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
/**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
* - Create the needed topics.
* - Deploy the needed resources.
*/
override fun setup() {
kafkaController.createTopics(this.topics)
resources.forEach {
......@@ -26,6 +39,12 @@ class KubernetesBenchmarkDeployment(
}
}
/**
* Tears down a [KubernetesBenchmark]:
* - Reset the Kafka Lag Exporter.
* - Remove the used topics.
* - Remove the [KubernetesResource]s.
*/
override fun teardown() {
KafkaLagExporterRemover(client).remove(LABEL)
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
......
......@@ -9,6 +9,10 @@ import java.time.Instant
private val logger = KotlinLogging.logger {}
/**
* Contains the analysis. Fetches a metric from Prometheus, documents it, and evaluates it.
* @param slo Slo that is used for the analysis.
*/
class AnalysisExecutor(
private val slo: BenchmarkExecution.Slo,
private val executionId: Int
......@@ -19,6 +23,14 @@ class AnalysisExecutor(
offset = Duration.ofHours(slo.offset.toLong())
)
/**
* Analyses an experiment via prometheus data.
* First fetches data from prometheus, then documents them and afterwards evaluate it via a [slo].
* @param load of the experiment.
* @param res of the experiment.
* @param executionDuration of the experiment.
* @return true if the experiment succeeded.
*/
fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false
......@@ -31,7 +43,7 @@ class AnalysisExecutor(
CsvExporter().toCsv(name = "$executionId-${load.get()}-${res.get()}-${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType,
sloType = slo.sloType,
externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold,
warmup = slo.warmup
......
......@@ -8,10 +8,16 @@ import java.util.*
private val logger = KotlinLogging.logger {}
/**
* Used to document the data received from prometheus for additional offline analysis.
*/
class CsvExporter {
/**
* Uses the PrintWriter to transform a PrometheusResponse to Csv
* Uses the [PrintWriter] to transform a [PrometheusResponse] to a CSV file.
* @param name of the file.
* @param prom Response that is documented.
*
*/
fun toCsv(name: String, prom: PrometheusResponse) {
val responseArray = promResponseToList(prom)
......@@ -27,7 +33,7 @@ class CsvExporter {
}
/**
* Converts a PrometheusResponse into a List of List of Strings
* Converts a [PrometheusResponse] into a [List] of [List]s of [String]s
*/
private fun promResponseToList(prom: PrometheusResponse): List<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString()
......
......@@ -7,6 +7,12 @@ import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Instant
/**
* [SloChecker] that uses an external source for the concrete evaluation.
* @param externalSlopeURL The url under which the external evaluation can be reached.
* @param threshold threshold that should not be exceeded to evaluate to true.
* @param warmup time that is not taken into consideration for the evaluation.
*/
class ExternalSloChecker(
private val externalSlopeURL: String,
private val threshold: Int,
......@@ -19,6 +25,17 @@ class ExternalSloChecker(
private val logger = KotlinLogging.logger {}
/**
* Evaluates an experiment using an external service.
* Will try to reach the external service until success or [RETRIES] times.
* Each request will timeout after [TIMEOUT].
*
* @param start point of the experiment.
* @param end point of the experiment.
* @param fetchedData that should be evaluated
* @return true if the experiment was successful(the threshold was not exceeded.
* @throws ConnectException if the external service could not be reached.
*/
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0
val data =
......
......@@ -11,10 +11,26 @@ import java.time.Instant
private val logger = KotlinLogging.logger {}
/**
* Used to fetch metrics from Prometheus.
* @param prometheusURL URL to the Prometheus server.
* @param offset Duration of time that the start and end points of the queries
* should be shifted. (for different timezones, etc..)
*/
class MetricFetcher(private val prometheusURL: String, private val offset: Duration) {
private val RETRIES = 2
private val TIMEOUT = 60.0
/**
* Tries to fetch a metric by a query to a Prometheus server.
* Retries to fetch the metric [RETRIES] times.
* Connects to the server via [prometheusURL].
*
* @param start start point of the query.
* @param end end point of the query.
* @param query query for the prometheus server.
* @throws ConnectException - if the prometheus server timed out/was not reached.
*/
fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse {
val offsetStart = start.minus(offset)
......@@ -46,6 +62,11 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
throw ConnectException("No answer from Prometheus received")
}
/**
* Deserializes a response from Prometheus.
* @param values Response from Prometheus.
* @return a [PrometheusResponse]
*/
private fun parseValues(values: Response): PrometheusResponse {
return Gson().fromJson<PrometheusResponse>(
values.jsonObject.toString(),
......
......@@ -3,6 +3,20 @@ package theodolite.evaluation
import theodolite.util.PrometheusResponse
import java.time.Instant
/**
* A SloChecker can be used to evaluate data from Promehteus.
* @constructor Creates an empty SloChecker
*/
interface SloChecker {
/**
* Evaluates [fetchedData] and returns if the experiment was successful.
* Returns if the evaluated experiment was successful.
*
* @param start of the experiment
* @param end of the experiment
* @param fetchedData from Prometheus that will be evaluated.
* @return true if experiment was successful. Otherwise false.
*/
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
}
package theodolite.evaluation
import java.time.Duration
/**
* Factory used to potentially create different [SloChecker]s.
* Supports: lag type.
*/
class SloCheckerFactory {
/**
* Creates different [SloChecker]s.
* Supports: lag type.
*
* @param sloType Type of the [SloChecker].
* @param externalSlopeURL Url to the concrete [SloChecker].
* @param threshold for the [SloChecker].
* @param warmup for the [SloChecker].
*
* @return A [SloChecker]
* @throws IllegalArgumentException If [sloType] not supported.
*/
fun create(
slotype: String,
sloType: String,
externalSlopeURL: String,
threshold: Int,
warmup: Int
): SloChecker {
return when (slotype) {
return when (sloType) {
"lag trend" -> ExternalSloChecker(
externalSlopeURL = externalSlopeURL,
threshold = threshold,
warmup = warmup
)
else -> throw IllegalArgumentException("Slotype $slotype not found.")
else -> throw IllegalArgumentException("Slotype $sloType not found.")
}
}
}
......@@ -32,11 +32,13 @@ abstract class BenchmarkExecutor(
var run: AtomicBoolean = AtomicBoolean(true)
/**
* Run a experiment for the given parametrization, evaluate the experiment and save the result.
* Run a experiment for the given parametrization, evaluate the
* experiment and save the result.
*
* @param load load to be tested.
* @param res resources to be tested.
* @return True, if the number of resources are suitable for the given load, false otherwise.
* @return True, if the number of resources are suitable for the
* given load, false otherwise.
*/
abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean
......
......@@ -35,12 +35,16 @@ class BenchmarkExecutorImpl(
this.run.set(false)
}
/**
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) {
result =
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration)
this.results.setResult(Pair(load, res), result)
}
benchmarkDeployment.teardown()
return result
}
}
......@@ -8,9 +8,20 @@ import theodolite.util.Resource
private val logger = KotlinLogging.logger {}
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) {
/**
* This Shutdown Hook can be used to delete all Kubernetes resources which are related to the given execution and benchmark.
*
* @property benchmarkExecution
* @property benchmark
*/
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) :
Thread() {
fun run() {
/**
* Run
* Delete all Kubernetes resources which are related to the execution and the benchmark.
*/
override fun run() {
// Build Configuration to teardown
logger.info { "Received shutdown signal -> Shutting down" }
val deployment =
......
......@@ -13,12 +13,30 @@ import theodolite.util.Results
import java.io.PrintWriter
import java.time.Duration
/**
* The Theodolite executor runs all the experiments defined with the given execution and benchmark configuration.
*
* @property config Configuration of a execution
* @property kubernetesBenchmark Configuration of a benchmark
* @constructor Create empty Theodolite executor
*/
class TheodoliteExecutor(
private val config: BenchmarkExecution,
private val kubernetesBenchmark: KubernetesBenchmark
) {
/**
* An executor object, configured with the specified benchmark, evaluation method, experiment duration
* and overrides which are given in the execution.
*/
lateinit var executor: BenchmarkExecutor
/**
* Creates all required components to start Theodolite.
*
* @return a [Config], that contains a list of [LoadDimension]s,
* a list of [Resource]s , and the [CompositeStrategy].
* The [CompositeStrategy] is configured and able to find the minimum required resource for the given load.
*/
private fun buildConfig(): Config {
val results = Results()
val strategyFactory = StrategyFactory()
......@@ -74,6 +92,10 @@ class TheodoliteExecutor(
return this.kubernetesBenchmark
}
/**
* Run all experiments which are specified in the corresponding
* execution and benchmark objects.
*/
fun run() {
storeAsFile(this.config, "${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "${this.config.executionId}-benchmark-configuration")
......
......@@ -9,6 +9,22 @@ import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {}
/**
* The Theodolite yaml executor loads the required configurations
* of the executions and the benchmark from yaml files and run the
* corresponding experiments.
*
* The location of the execution, benchmarks and Kubernetes resource
* files can be configured via the following environment variables:
* `THEODOLITE_EXECUTION`
*
* `THEODOLITE_BENCHMARK`
*
* `THEODOLITE_APP_RESOURCES`
*
* @constructor Create empty Theodolite yaml executor
*/
class TheodoliteYamlExecutor {
private val parser = YamlParser()
......
......@@ -5,13 +5,37 @@ import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting KubernetesBenchmarks.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to add
*
* @see KubernetesBenchmark
*/
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark
}
/**
* Update a KubernetesBenchmark.
*
* @param oldBenchmark the KubernetesBenchmark to update
* @param newBenchmark the updated KubernetesBenchmark
*
* @see KubernetesBenchmark
*/
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
......@@ -23,6 +47,13 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
}
}
/**
* Delete a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to delete
*
* @see KubernetesBenchmark
*/
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
......
......@@ -7,13 +7,33 @@ import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting BenchmarkExecutions.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see BenchmarkExecution
*/
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
/**
* Add an execution to the end of the queue of the TheodoliteController.
*
* @param execution the execution to add
*/
override fun onAdd(execution: BenchmarkExecution) {
execution.name = execution.metadata.name
logger.info { "Add new execution ${execution.metadata.name} to queue." }
this.controller.executionsQueue.add(execution)
}
/**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue.
*
* @param execution the execution to update
*/
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
......@@ -29,6 +49,11 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
}
/**
* Delete an execution from the queue of the TheodoliteController.
*
* @param execution the execution to delete
*/
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment