Skip to content
Snippets Groups Projects
Commit 676607ff authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb into 210-allows-to-scale-load-generators
parents d76b7153 0c976926
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!124Deploy correct number of load generator instances,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 360 additions and 14 deletions
...@@ -5,8 +5,19 @@ import theodolite.util.ConfigurationOverride ...@@ -5,8 +5,19 @@ import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
/**
* A Benchmark contains:
* - The [Resource]s that can be scaled for the benchmark.
* - The [LoadDimension]s that can be scaled the benchmark.
* - additional [ConfigurationOverride]s.
*/
@RegisterForReflection @RegisterForReflection
interface Benchmark { interface Benchmark {
/**
* Builds a Deployment that can be deployed.
* @return a BenchmarkDeployment.
*/
fun buildDeployment( fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
......
package theodolite.benchmark package theodolite.benchmark
/**
* A BenchmarkDeployment contains the necessary infrastructure to execute a benchmark.
* Therefore it has the capabilities to set up the deployment of a benchmark and to tear it down.
*/
interface BenchmarkDeployment { interface BenchmarkDeployment {
/**
* Setup a benchmark. This method is responsible for deploying the resources
* and organize the needed infrastructure.
*/
fun setup() fun setup()
/**
* Tears down a benchmark. This method is responsible for deleting the deployed
* resources and to reset the used infrastructure.
*/
fun teardown() fun teardown()
} }
...@@ -8,6 +8,22 @@ import io.quarkus.runtime.annotations.RegisterForReflection ...@@ -8,6 +8,22 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import kotlin.properties.Delegates import kotlin.properties.Delegates
/**
* This class represents the configuration for an execution of a benchmark.
* An example for this is the BenchmarkExecution.yaml
* A BenchmarkExecution consists of:
* - A [name].
* - The [benchmark] that should be executed.
* - The [load] that should be checked in the benchmark.
* - The [resources] that should be checked in the benchmark.
* - A list of [slos] that are used for the evaluation of the experiments.
* - An [execution] that encapsulates: the strategy, the duration, and the restrictions
* for the execution of the benchmark.
* - [configOverrides] additional configurations.
* This class is used for parsing(in [theodolite.execution.TheodoliteYamlExecutor]) and
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty BenchmarkExecution.
*/
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class BenchmarkExecution : CustomResource(), Namespaced { class BenchmarkExecution : CustomResource(), Namespaced {
...@@ -20,6 +36,10 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -20,6 +36,10 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var execution: Execution lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?> lateinit var configOverrides: List<ConfigurationOverride?>
/**
* This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions]
* which are used for the concrete benchmark experiments.
*/
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class Execution : KubernetesResource { class Execution : KubernetesResource {
...@@ -29,6 +49,15 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -29,6 +49,15 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var restrictions: List<String> lateinit var restrictions: List<String>
} }
/**
* Measurable metric.
* [sloType] determines the type of the metric.
* It is evaluated using the [theodolite.evaluation.ExternalSloChecker] by data measured by Prometheus.
* The evaluation checks if a [threshold] is reached or not.
* [offset] determines the shift in hours by which the start and end timestamps should be shifted.
* The [warmup] determines after which time the metric should be evaluated to avoid starting interferences.
* The [warmup] time unit depends on the Slo: for the lag trend it is in seconds.
*/
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class Slo : KubernetesResource { class Slo : KubernetesResource {
...@@ -40,7 +69,10 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -40,7 +69,10 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var warmup by Delegates.notNull<Int>() var warmup by Delegates.notNull<Int>()
} }
/**
* Represents a Load that should be created and checked.
* It can be set to [loadValues].
*/
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class LoadDefinition : KubernetesResource { class LoadDefinition : KubernetesResource {
...@@ -48,6 +80,9 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -48,6 +80,9 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var loadValues: List<Int> lateinit var loadValues: List<Int>
} }
/**
* Represents a resource that can be scaled to [resourceValues].
*/
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class ResourceDefinition : KubernetesResource { class ResourceDefinition : KubernetesResource {
......
...@@ -5,8 +5,16 @@ import mu.KotlinLogging ...@@ -5,8 +5,16 @@ import mu.KotlinLogging
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Used to reset the KafkaLagExporter by deleting the pod.
* @param client NamespacedKubernetesClient used for the deletion.
*/
class KafkaLagExporterRemover(private val client: NamespacedKubernetesClient) { class KafkaLagExporterRemover(private val client: NamespacedKubernetesClient) {
/**
* Deletes all pods with the selected label.
* @param [label] of the pod that should be deleted.
*/
fun remove(label: String) { fun remove(label: String) {
this.client.pods().withLabel(label).delete() this.client.pods().withLabel(label).delete()
logger.info { "Pod with label: $label deleted" } logger.info { "Pod with label: $label deleted" }
......
...@@ -11,8 +11,25 @@ import theodolite.patcher.PatcherFactory ...@@ -11,8 +11,25 @@ import theodolite.patcher.PatcherFactory
import theodolite.util.* import theodolite.util.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private var DEFAULT_NAMESPACE = "default" private var DEFAULT_NAMESPACE = "default"
/**
* Represents a benchmark in Kubernetes. An example for this is the BenchmarkType.yaml
* Contains a of:
* - [name] of the benchmark,
* - [appResource] list of the resources that have to be deployed for the benchmark,
* - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager],
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
* This class is used for the parsing(in the [theodolite.execution.TheodoliteYamlExecutor]) and
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty Benchmark.
*/
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
lateinit var name: String lateinit var name: String
...@@ -24,6 +41,11 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -24,6 +41,11 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
/**
* Loads [KubernetesResource]s.
* It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using
* the [K8sResourceLoader]
*/
private fun loadKubernetesResources(resources: List<String>): List<Pair<String, KubernetesResource>> { private fun loadKubernetesResources(resources: List<String>): List<Pair<String, KubernetesResource>> {
val parser = YamlParser() val parser = YamlParser()
val loader = K8sResourceLoader(DefaultKubernetesClient().inNamespace(namespace)) val loader = K8sResourceLoader(DefaultKubernetesClient().inNamespace(namespace))
...@@ -36,6 +58,15 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -36,6 +58,15 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
} }
} }
/**
* Builds a deployment.
* First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment.
* @param configurationOverrides
* @return a [BenchmarkDeployment]
*/
override fun buildDeployment( override fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
......
...@@ -7,6 +7,14 @@ import org.apache.kafka.clients.admin.NewTopic ...@@ -7,6 +7,14 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
/**
* Organizes the deployment of benchmarks in Kubernetes.
*
* @param namespace to operate in.
* @param resources List of [KubernetesResource] that are managed.
* @param kafkaConfig for the organization of Kafka topics.
* @param topics List of topics that are created or deleted.
*/
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmarkDeployment( class KubernetesBenchmarkDeployment(
val namespace: String, val namespace: String,
...@@ -19,6 +27,11 @@ class KubernetesBenchmarkDeployment( ...@@ -19,6 +27,11 @@ class KubernetesBenchmarkDeployment(
private val kubernetesManager = K8sManager(client) private val kubernetesManager = K8sManager(client)
private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter" private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
/**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
* - Create the needed topics.
* - Deploy the needed resources.
*/
override fun setup() { override fun setup() {
kafkaController.createTopics(this.topics) kafkaController.createTopics(this.topics)
resources.forEach { resources.forEach {
...@@ -26,6 +39,12 @@ class KubernetesBenchmarkDeployment( ...@@ -26,6 +39,12 @@ class KubernetesBenchmarkDeployment(
} }
} }
/**
* Tears down a [KubernetesBenchmark]:
* - Reset the Kafka Lag Exporter.
* - Remove the used topics.
* - Remove the [KubernetesResource]s.
*/
override fun teardown() { override fun teardown() {
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LABEL)
kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
......
...@@ -9,6 +9,10 @@ import java.time.Instant ...@@ -9,6 +9,10 @@ import java.time.Instant
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Contains the analysis. Fetches a metric from Prometheus, documents it, and evaluates it.
* @param slo Slo that is used for the analysis.
*/
class AnalysisExecutor( class AnalysisExecutor(
private val slo: BenchmarkExecution.Slo, private val slo: BenchmarkExecution.Slo,
private val executionId: Int private val executionId: Int
...@@ -19,6 +23,14 @@ class AnalysisExecutor( ...@@ -19,6 +23,14 @@ class AnalysisExecutor(
offset = Duration.ofHours(slo.offset.toLong()) offset = Duration.ofHours(slo.offset.toLong())
) )
/**
* Analyses an experiment via prometheus data.
* First fetches data from prometheus, then documents them and afterwards evaluate it via a [slo].
* @param load of the experiment.
* @param res of the experiment.
* @param executionDuration of the experiment.
* @return true if the experiment succeeded.
*/
fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean { fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false var result = false
...@@ -31,7 +43,7 @@ class AnalysisExecutor( ...@@ -31,7 +43,7 @@ class AnalysisExecutor(
CsvExporter().toCsv(name = "$executionId-${load.get()}-${res.get()}-${slo.sloType}", prom = prometheusData) CsvExporter().toCsv(name = "$executionId-${load.get()}-${res.get()}-${slo.sloType}", prom = prometheusData)
val sloChecker = SloCheckerFactory().create( val sloChecker = SloCheckerFactory().create(
slotype = slo.sloType, sloType = slo.sloType,
externalSlopeURL = slo.externalSloUrl, externalSlopeURL = slo.externalSloUrl,
threshold = slo.threshold, threshold = slo.threshold,
warmup = slo.warmup warmup = slo.warmup
......
...@@ -8,10 +8,16 @@ import java.util.* ...@@ -8,10 +8,16 @@ import java.util.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Used to document the data received from prometheus for additional offline analysis.
*/
class CsvExporter { class CsvExporter {
/** /**
* Uses the PrintWriter to transform a PrometheusResponse to Csv * Uses the [PrintWriter] to transform a [PrometheusResponse] to a CSV file.
* @param name of the file.
* @param prom Response that is documented.
*
*/ */
fun toCsv(name: String, prom: PrometheusResponse) { fun toCsv(name: String, prom: PrometheusResponse) {
val responseArray = promResponseToList(prom) val responseArray = promResponseToList(prom)
...@@ -27,7 +33,7 @@ class CsvExporter { ...@@ -27,7 +33,7 @@ class CsvExporter {
} }
/** /**
* Converts a PrometheusResponse into a List of List of Strings * Converts a [PrometheusResponse] into a [List] of [List]s of [String]s
*/ */
private fun promResponseToList(prom: PrometheusResponse): List<List<String>> { private fun promResponseToList(prom: PrometheusResponse): List<List<String>> {
val name = prom.data?.result?.get(0)?.metric?.group.toString() val name = prom.data?.result?.get(0)?.metric?.group.toString()
......
...@@ -7,6 +7,12 @@ import theodolite.util.PrometheusResponse ...@@ -7,6 +7,12 @@ import theodolite.util.PrometheusResponse
import java.net.ConnectException import java.net.ConnectException
import java.time.Instant import java.time.Instant
/**
* [SloChecker] that uses an external source for the concrete evaluation.
* @param externalSlopeURL The url under which the external evaluation can be reached.
* @param threshold threshold that should not be exceeded to evaluate to true.
* @param warmup time that is not taken into consideration for the evaluation.
*/
class ExternalSloChecker( class ExternalSloChecker(
private val externalSlopeURL: String, private val externalSlopeURL: String,
private val threshold: Int, private val threshold: Int,
...@@ -19,6 +25,17 @@ class ExternalSloChecker( ...@@ -19,6 +25,17 @@ class ExternalSloChecker(
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Evaluates an experiment using an external service.
* Will try to reach the external service until success or [RETRIES] times.
* Each request will timeout after [TIMEOUT].
*
* @param start point of the experiment.
* @param end point of the experiment.
* @param fetchedData that should be evaluated
* @return true if the experiment was successful(the threshold was not exceeded.
* @throws ConnectException if the external service could not be reached.
*/
override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean { override fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean {
var counter = 0 var counter = 0
val data = val data =
......
...@@ -11,10 +11,26 @@ import java.time.Instant ...@@ -11,10 +11,26 @@ import java.time.Instant
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Used to fetch metrics from Prometheus.
* @param prometheusURL URL to the Prometheus server.
* @param offset Duration of time that the start and end points of the queries
* should be shifted. (for different timezones, etc..)
*/
class MetricFetcher(private val prometheusURL: String, private val offset: Duration) { class MetricFetcher(private val prometheusURL: String, private val offset: Duration) {
private val RETRIES = 2 private val RETRIES = 2
private val TIMEOUT = 60.0 private val TIMEOUT = 60.0
/**
* Tries to fetch a metric by a query to a Prometheus server.
* Retries to fetch the metric [RETRIES] times.
* Connects to the server via [prometheusURL].
*
* @param start start point of the query.
* @param end end point of the query.
* @param query query for the prometheus server.
* @throws ConnectException - if the prometheus server timed out/was not reached.
*/
fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse { fun fetchMetric(start: Instant, end: Instant, query: String): PrometheusResponse {
val offsetStart = start.minus(offset) val offsetStart = start.minus(offset)
...@@ -46,6 +62,11 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat ...@@ -46,6 +62,11 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
throw ConnectException("No answer from Prometheus received") throw ConnectException("No answer from Prometheus received")
} }
/**
* Deserializes a response from Prometheus.
* @param values Response from Prometheus.
* @return a [PrometheusResponse]
*/
private fun parseValues(values: Response): PrometheusResponse { private fun parseValues(values: Response): PrometheusResponse {
return Gson().fromJson<PrometheusResponse>( return Gson().fromJson<PrometheusResponse>(
values.jsonObject.toString(), values.jsonObject.toString(),
......
...@@ -3,6 +3,20 @@ package theodolite.evaluation ...@@ -3,6 +3,20 @@ package theodolite.evaluation
import theodolite.util.PrometheusResponse import theodolite.util.PrometheusResponse
import java.time.Instant import java.time.Instant
/**
* A SloChecker can be used to evaluate data from Promehteus.
* @constructor Creates an empty SloChecker
*/
interface SloChecker { interface SloChecker {
/**
* Evaluates [fetchedData] and returns if the experiment was successful.
* Returns if the evaluated experiment was successful.
*
* @param start of the experiment
* @param end of the experiment
* @param fetchedData from Prometheus that will be evaluated.
* @return true if experiment was successful. Otherwise false.
*/
fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean fun evaluate(start: Instant, end: Instant, fetchedData: PrometheusResponse): Boolean
} }
package theodolite.evaluation package theodolite.evaluation
import java.time.Duration /**
* Factory used to potentially create different [SloChecker]s.
* Supports: lag type.
*/
class SloCheckerFactory { class SloCheckerFactory {
/**
* Creates different [SloChecker]s.
* Supports: lag type.
*
* @param sloType Type of the [SloChecker].
* @param externalSlopeURL Url to the concrete [SloChecker].
* @param threshold for the [SloChecker].
* @param warmup for the [SloChecker].
*
* @return A [SloChecker]
* @throws IllegalArgumentException If [sloType] not supported.
*/
fun create( fun create(
slotype: String, sloType: String,
externalSlopeURL: String, externalSlopeURL: String,
threshold: Int, threshold: Int,
warmup: Int warmup: Int
): SloChecker { ): SloChecker {
return when (sloType) {
return when (slotype) {
"lag trend" -> ExternalSloChecker( "lag trend" -> ExternalSloChecker(
externalSlopeURL = externalSlopeURL, externalSlopeURL = externalSlopeURL,
threshold = threshold, threshold = threshold,
warmup = warmup warmup = warmup
) )
else -> throw IllegalArgumentException("Slotype $slotype not found.") else -> throw IllegalArgumentException("Slotype $sloType not found.")
} }
} }
} }
...@@ -32,11 +32,13 @@ abstract class BenchmarkExecutor( ...@@ -32,11 +32,13 @@ abstract class BenchmarkExecutor(
var run: AtomicBoolean = AtomicBoolean(true) var run: AtomicBoolean = AtomicBoolean(true)
/** /**
* Run a experiment for the given parametrization, evaluate the experiment and save the result. * Run a experiment for the given parametrization, evaluate the
* experiment and save the result.
* *
* @param load load to be tested. * @param load load to be tested.
* @param res resources to be tested. * @param res resources to be tested.
* @return True, if the number of resources are suitable for the given load, false otherwise. * @return True, if the number of resources are suitable for the
* given load, false otherwise.
*/ */
abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean
......
...@@ -35,12 +35,16 @@ class BenchmarkExecutorImpl( ...@@ -35,12 +35,16 @@ class BenchmarkExecutorImpl(
this.run.set(false) this.run.set(false)
} }
/**
* Analyse the experiment, if [run] is true, otherwise the experiment was canceled by the user.
*/
if (this.run.get()) { if (this.run.get()) {
result = result =
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration) AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration)
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
} }
benchmarkDeployment.teardown() benchmarkDeployment.teardown()
return result return result
} }
} }
...@@ -8,9 +8,20 @@ import theodolite.util.Resource ...@@ -8,9 +8,20 @@ import theodolite.util.Resource
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) { /**
* This Shutdown Hook can be used to delete all Kubernetes resources which are related to the given execution and benchmark.
*
* @property benchmarkExecution
* @property benchmark
*/
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) :
Thread() {
fun run() { /**
* Run
* Delete all Kubernetes resources which are related to the execution and the benchmark.
*/
override fun run() {
// Build Configuration to teardown // Build Configuration to teardown
logger.info { "Received shutdown signal -> Shutting down" } logger.info { "Received shutdown signal -> Shutting down" }
val deployment = val deployment =
......
...@@ -13,12 +13,30 @@ import theodolite.util.Results ...@@ -13,12 +13,30 @@ import theodolite.util.Results
import java.io.PrintWriter import java.io.PrintWriter
import java.time.Duration import java.time.Duration
/**
* The Theodolite executor runs all the experiments defined with the given execution and benchmark configuration.
*
* @property config Configuration of a execution
* @property kubernetesBenchmark Configuration of a benchmark
* @constructor Create empty Theodolite executor
*/
class TheodoliteExecutor( class TheodoliteExecutor(
private val config: BenchmarkExecution, private val config: BenchmarkExecution,
private val kubernetesBenchmark: KubernetesBenchmark private val kubernetesBenchmark: KubernetesBenchmark
) { ) {
/**
* An executor object, configured with the specified benchmark, evaluation method, experiment duration
* and overrides which are given in the execution.
*/
lateinit var executor: BenchmarkExecutor lateinit var executor: BenchmarkExecutor
/**
* Creates all required components to start Theodolite.
*
* @return a [Config], that contains a list of [LoadDimension]s,
* a list of [Resource]s , and the [CompositeStrategy].
* The [CompositeStrategy] is configured and able to find the minimum required resource for the given load.
*/
private fun buildConfig(): Config { private fun buildConfig(): Config {
val results = Results() val results = Results()
val strategyFactory = StrategyFactory() val strategyFactory = StrategyFactory()
...@@ -74,6 +92,10 @@ class TheodoliteExecutor( ...@@ -74,6 +92,10 @@ class TheodoliteExecutor(
return this.kubernetesBenchmark return this.kubernetesBenchmark
} }
/**
* Run all experiments which are specified in the corresponding
* execution and benchmark objects.
*/
fun run() { fun run() {
storeAsFile(this.config, "${this.config.executionId}-execution-configuration") storeAsFile(this.config, "${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "${this.config.executionId}-benchmark-configuration") storeAsFile(kubernetesBenchmark, "${this.config.executionId}-benchmark-configuration")
......
...@@ -9,6 +9,22 @@ import kotlin.system.exitProcess ...@@ -9,6 +9,22 @@ import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* The Theodolite yaml executor loads the required configurations
* of the executions and the benchmark from yaml files and run the
* corresponding experiments.
*
* The location of the execution, benchmarks and Kubernetes resource
* files can be configured via the following environment variables:
* `THEODOLITE_EXECUTION`
*
* `THEODOLITE_BENCHMARK`
*
* `THEODOLITE_APP_RESOURCES`
*
* @constructor Create empty Theodolite yaml executor
*/
class TheodoliteYamlExecutor { class TheodoliteYamlExecutor {
private val parser = YamlParser() private val parser = YamlParser()
......
...@@ -5,13 +5,37 @@ import mu.KotlinLogging ...@@ -5,13 +5,37 @@ import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting KubernetesBenchmarks.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> { class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to add
*
* @see KubernetesBenchmark
*/
override fun onAdd(benchmark: KubernetesBenchmark) { override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." } logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark this.controller.benchmarks[benchmark.name] = benchmark
} }
/**
* Update a KubernetesBenchmark.
*
* @param oldBenchmark the KubernetesBenchmark to update
* @param newBenchmark the updated KubernetesBenchmark
*
* @see KubernetesBenchmark
*/
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) { override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." } logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name newBenchmark.name = newBenchmark.metadata.name
...@@ -23,6 +47,13 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou ...@@ -23,6 +47,13 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
} }
} }
/**
* Delete a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to delete
*
* @see KubernetesBenchmark
*/
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) { override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." } logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name) this.controller.benchmarks.remove(benchmark.metadata.name)
......
...@@ -7,13 +7,33 @@ import java.lang.NullPointerException ...@@ -7,13 +7,33 @@ import java.lang.NullPointerException
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting BenchmarkExecutions.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see BenchmarkExecution
*/
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> { class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
/**
* Add an execution to the end of the queue of the TheodoliteController.
*
* @param execution the execution to add
*/
override fun onAdd(execution: BenchmarkExecution) { override fun onAdd(execution: BenchmarkExecution) {
execution.name = execution.metadata.name execution.name = execution.metadata.name
logger.info { "Add new execution ${execution.metadata.name} to queue." } logger.info { "Add new execution ${execution.metadata.name} to queue." }
this.controller.executionsQueue.add(execution) this.controller.executionsQueue.add(execution)
} }
/**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue.
*
* @param execution the execution to update
*/
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." } logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name newExecution.name = newExecution.metadata.name
...@@ -29,6 +49,11 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv ...@@ -29,6 +49,11 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
} }
} }
/**
* Delete an execution from the queue of the TheodoliteController.
*
* @param execution the execution to delete
*/
override fun onDelete(execution: BenchmarkExecution, b: Boolean) { override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
try { try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
......
...@@ -15,6 +15,20 @@ import java.util.concurrent.atomic.AtomicInteger ...@@ -15,6 +15,20 @@ import java.util.concurrent.atomic.AtomicInteger
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/**
* The controller implementation for Theodolite.
*
* Maintains a Dequeue, based on ConcurrentLinkedDequeue, of executions to be executed for a benchmark.
*
* @param client The NamespacedKubernetesClient
* @param executionContext The CustomResourceDefinitionContext
*
* @see NamespacedKubernetesClient
* @see CustomResourceDefinitionContext
* @see BenchmarkExecution
* @see KubernetesBenchmark
* @see ConcurrentLinkedDeque
*/
class TheodoliteController( class TheodoliteController(
val client: NamespacedKubernetesClient, val client: NamespacedKubernetesClient,
val executionContext: CustomResourceDefinitionContext, val executionContext: CustomResourceDefinitionContext,
...@@ -26,6 +40,9 @@ class TheodoliteController( ...@@ -26,6 +40,9 @@ class TheodoliteController(
var isUpdated = AtomicBoolean(false) var isUpdated = AtomicBoolean(false)
var executionID = AtomicInteger(0) var executionID = AtomicInteger(0)
/**
* Runs the TheodoliteController forever.
*/
fun run() { fun run() {
while (true) { while (true) {
try { try {
...@@ -46,6 +63,12 @@ class TheodoliteController( ...@@ -46,6 +63,12 @@ class TheodoliteController(
} }
} }
/**
* Ensures that the application state corresponds to the defined KubernetesBenchmarks and BenchmarkExecutions.
*
* @see KubernetesBenchmark
* @see BenchmarkExecution
*/
@Synchronized @Synchronized
private fun reconcile() { private fun reconcile() {
while (executionsQueue.isNotEmpty()) { while (executionsQueue.isNotEmpty()) {
...@@ -61,6 +84,12 @@ class TheodoliteController( ...@@ -61,6 +84,12 @@ class TheodoliteController(
} }
} }
/**
* Execute a benchmark with a defined KubernetesBenchmark and BenchmarkExecution
*
* @see KubernetesBenchmark
* @see BenchmarkExecution
*/
@Synchronized @Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
execution.executionId = executionID.getAndSet(executionID.get() + 1) execution.executionId = executionID.getAndSet(executionID.get() + 1)
...@@ -82,6 +111,11 @@ class TheodoliteController( ...@@ -82,6 +111,11 @@ class TheodoliteController(
logger.info { "Execution of ${execution.name} is finally stopped." } logger.info { "Execution of ${execution.name} is finally stopped." }
} }
/**
* @return true if the TheodoliteExecutor of this controller is initialized. Else returns false.
*
* @see TheodoliteExecutor
*/
@Synchronized @Synchronized
fun isInitialized(): Boolean { fun isInitialized(): Boolean {
return ::executor.isInitialized return ::executor.isInitialized
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment