Skip to content
Snippets Groups Projects
Commit c1ce4104 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

merge upstream master

parents 45aef13a f7e2b939
Branches
Tags
1 merge request!171Introduce ResourceSets to make loading of resource files more flexible
Showing
with 232 additions and 233 deletions
package theodolite.benchmark
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
/**
* Used to reset the KafkaLagExporter by deleting the pod.
* @param client NamespacedKubernetesClient used for the deletion.
*/
class KafkaLagExporterRemover(private val client: NamespacedKubernetesClient) {
/**
* Deletes all pods with the selected label.
* @param [label] of the pod that should be deleted.
*/
fun remove(label: String) {
this.client.pods().withLabel(label).delete()
logger.info { "Pod with label: $label deleted" }
}
}
......@@ -13,6 +13,7 @@ import theodolite.util.*
private val logger = KotlinLogging.logger {}
private var DEFAULT_NAMESPACE = "default"
private var DEFAULT_THEODOLITE_APP_RESOURCES = "./benchmark-resources"
/**
* Represents a benchmark in Kubernetes. An example for this is the BenchmarkType.yaml
......@@ -26,7 +27,7 @@ private var DEFAULT_NAMESPACE = "default"
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
* This class is used for the parsing(in the [theodolite.execution.TheodoliteYamlExecutor]) and
* This class is used for the parsing(in the [theodolite.execution.TheodoliteStandalone]) and
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty Benchmark.
*/
......@@ -34,20 +35,12 @@ private var DEFAULT_NAMESPACE = "default"
@RegisterForReflection
class KubernetesBenchmark : KubernetesResource, Benchmark {
lateinit var name: String
// var appResource: List<Pair<String, KubernetesResource>>
// var loadGenResource: List<Pair<String, KubernetesResource>>
lateinit var resourceTypes: List<TypeName>
lateinit var loadTypes: List<TypeName>
lateinit var kafkaConfig: KafkaConfig
lateinit var appResourceSets: List<ResourceSets>
lateinit var loadGenResourceSets: List<ResourceSets>
var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
// init {
// this.appResource = appResourceSets.flatMap { it.loadResourceSet() }
// this.loadGenResource = loadGenResourceSets.flatMap { it.loadResourceSet() }
// }
/**
* Loads [KubernetesResource]s.
......@@ -75,7 +68,6 @@ class KubernetesBenchmark: KubernetesResource, Benchmark{
afterTeardownDelay: Long
): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." }
val appResources = loadKubernetesResources(this.appResourceSets)
val loadGenResources = loadKubernetesResources(this.loadGenResourceSets)
......@@ -97,7 +89,6 @@ class KubernetesBenchmark: KubernetesResource, Benchmark{
}
}
return KubernetesBenchmarkDeployment(
namespace = namespace,
appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay,
......
......@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.ResourceByLabelHandler
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
import java.time.Duration
......@@ -22,7 +23,6 @@ private val logger = KotlinLogging.logger {}
*/
@RegisterForReflection
class KubernetesBenchmarkDeployment(
val namespace: String,
val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long,
......@@ -33,7 +33,8 @@ class KubernetesBenchmarkDeployment(
) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client)
private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val LAG_EXPORTER_POD_LABEL_NAME = "app.kubernetes.io/name"
private val LAG_EXPORTER_POD_LABEL_VALUE = "kafka-lag-exporter"
/**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
......@@ -60,7 +61,10 @@ class KubernetesBenchmarkDeployment(
loadGenResources.forEach { kubernetesManager.remove(it) }
appResources.forEach { kubernetesManager.remove(it) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
ResourceByLabelHandler(client).removePods(
labelName = LAG_EXPORTER_POD_LABEL_NAME,
labelValue = LAG_EXPORTER_POD_LABEL_VALUE
)
logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." }
Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis())
}
......
......@@ -12,6 +12,7 @@ import java.util.*
import java.util.regex.Pattern
private val logger = KotlinLogging.logger {}
private val RECORD_LAG_QUERY = "sum by(group)(kafka_consumergroup_group_lag >= 0)"
/**
* Contains the analysis. Fetches a metric from Prometheus, documents it, and evaluates it.
......@@ -32,7 +33,7 @@ class AnalysisExecutor(
* First fetches data from prometheus, then documents them and afterwards evaluate it via a [slo].
* @param load of the experiment.
* @param res of the experiment.
* @param executionDuration of the experiment.
* @param executionIntervals list of start and end points of experiments
* @return true if the experiment succeeded.
*/
fun analyze(load: LoadDimension, res: Resource, executionIntervals: List<Pair<Instant, Instant>>): Boolean {
......@@ -45,16 +46,20 @@ class AnalysisExecutor(
val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}"
val prometheusData = executionIntervals
.map { interval -> fetcher.fetchMetric(
.map { interval ->
fetcher.fetchMetric(
start = interval.first,
end = interval.second,
query = "sum by(group)(kafka_consumergroup_group_lag >= 0)") }
query = RECORD_LAG_QUERY
)
}
prometheusData.forEach { data ->
ioHandler.writeToCSVFile(
fileURL = "${fileURL}_${repetitionCounter++}",
data = data.getResultAsList(),
columns = listOf("group", "timestamp", "value"))
columns = listOf("group", "timestamp", "value")
)
}
val sloChecker = SloCheckerFactory().create(
......@@ -67,6 +72,7 @@ class AnalysisExecutor(
result = sloChecker.evaluate(prometheusData)
} catch (e: Exception) {
// TODO(throw exception in order to make it possible to mark an experiment as unsuccessfully)
logger.error { "Evaluation failed for resource '${res.get()}' and load '${load.get()}'. Error: $e" }
}
return result
......@@ -75,7 +81,7 @@ class AnalysisExecutor(
private val NONLATIN: Pattern = Pattern.compile("[^\\w-]")
private val WHITESPACE: Pattern = Pattern.compile("[\\s]")
fun String.toSlug(): String {
private fun String.toSlug(): String {
val noWhitespace: String = WHITESPACE.matcher(this).replaceAll("-")
val normalized: String = Normalizer.normalize(noWhitespace, Normalizer.Form.NFD)
val slug: String = NONLATIN.matcher(normalized).replaceAll("")
......
......@@ -5,7 +5,6 @@ import khttp.post
import mu.KotlinLogging
import theodolite.util.PrometheusResponse
import java.net.ConnectException
import java.time.Instant
/**
* [SloChecker] that uses an external source for the concrete evaluation.
......@@ -37,10 +36,13 @@ class ExternalSloChecker(
*/
override fun evaluate(fetchedData: List<PrometheusResponse>): Boolean {
var counter = 0
val data = Gson().toJson(mapOf(
val data = Gson().toJson(
mapOf(
"total_lags" to fetchedData.map { it.data?.result },
"threshold" to threshold,
"warmup" to warmup))
"warmup" to warmup
)
)
while (counter < RETRIES) {
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
......
......@@ -8,13 +8,10 @@ import theodolite.util.PrometheusResponse
*/
interface SloChecker {
/**
* Evaluates [fetchedData] and returns if the experiment was successful.
* Returns if the evaluated experiment was successful.
* Evaluates [fetchedData] and returns if the experiments were successful.
*
* @param start of the experiment
* @param end of the experiment
* @param fetchedData from Prometheus that will be evaluated.
* @return true if experiment was successful. Otherwise false.
* @return true if experiments were successful. Otherwise false.
*/
fun evaluate(fetchedData: List<PrometheusResponse>): Boolean
}
......@@ -5,7 +5,10 @@ import mu.KotlinLogging
import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor
import theodolite.util.*
import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension
import theodolite.util.Resource
import theodolite.util.Results
import java.time.Duration
import java.time.Instant
......@@ -22,7 +25,17 @@ class BenchmarkExecutorImpl(
executionId: Int,
loadGenerationDelay: Long,
afterTeardownDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, repetitions, executionId, loadGenerationDelay, afterTeardownDelay) {
) : BenchmarkExecutor(
benchmark,
results,
executionDuration,
configurationOverrides,
slo,
repetitions,
executionId,
loadGenerationDelay,
afterTeardownDelay
) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val executionIntervals: MutableList<Pair<Instant, Instant>> = ArrayList()
......@@ -44,15 +57,23 @@ class BenchmarkExecutorImpl(
.analyze(
load = load,
res = res,
executionIntervals = executionIntervals)
executionIntervals = executionIntervals
)
this.results.setResult(Pair(load, res), result)
}
return result
}
private fun runSingleExperiment(load: LoadDimension, res: Resource): Pair<Instant, Instant> {
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides, this.loadGenerationDelay, this.afterTeardownDelay)
val benchmarkDeployment = benchmark.buildDeployment(
load,
res,
this.configurationOverrides,
this.loadGenerationDelay,
this.afterTeardownDelay
)
val from = Instant.now()
// TODO(restructure try catch in order to throw exceptions if there are significant problems by running a experiment)
try {
benchmarkDeployment.setup()
this.waitAndLog()
......
......@@ -17,8 +17,8 @@ object Main {
logger.info { "Start Theodolite with mode $mode" }
when (mode) {
"standalone" -> TheodoliteYamlExecutor().start()
"yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209)
"standalone" -> TheodoliteStandalone().start()
"yaml-executor" -> TheodoliteStandalone().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start()
else -> {
logger.error { "MODE $mode not found" }
......
......@@ -5,7 +5,6 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.LoadDimension
import theodolite.util.Resource
import java.lang.Exception
private val logger = KotlinLogging.logger {}
......@@ -36,11 +35,15 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
)
deployment.teardown()
} catch (e: Exception) {
logger.warn { "Could not delete all specified resources from Kubernetes. " +
"This could be the case, if not all resources are deployed and running." }
// TODO(throw exception in order to make it possible to mark an experiment as unsuccessfully)
logger.warn {
"Could not delete all specified resources from Kubernetes. " +
"This could be the case, if not all resources are deployed and running."
}
}
logger.info { "Teardown everything deployed" }
logger.info { "Teardown completed" }
logger.info {
"Finished teardown of all benchmark resources."
}
}
}
......@@ -70,14 +70,18 @@ class TheodoliteExecutor(
if (config.load.loadValues != config.load.loadValues.sorted()) {
config.load.loadValues = config.load.loadValues.sorted()
logger.info { "Load values are not sorted correctly, Theodolite sorts them in ascending order." +
"New order is: ${config.load.loadValues}" }
logger.info {
"Load values are not sorted correctly, Theodolite sorts them in ascending order." +
"New order is: ${config.load.loadValues}"
}
}
if (config.resources.resourceValues != config.resources.resourceValues.sorted()) {
config.resources.resourceValues = config.resources.resourceValues.sorted()
logger.info { "Load values are not sorted correctly, Theodolite sorts them in ascending order." +
"New order is: ${config.resources.resourceValues}" }
logger.info {
"Load values are not sorted correctly, Theodolite sorts them in ascending order." +
"New order is: ${config.resources.resourceValues}"
}
}
return Config(
......@@ -103,10 +107,6 @@ class TheodoliteExecutor(
return this.config
}
fun getBenchmark(): KubernetesBenchmark {
return this.kubernetesBenchmark
}
/**
* Run all experiments which are specified in the corresponding
* execution and benchmark objects.
......@@ -116,7 +116,10 @@ class TheodoliteExecutor(
val resultsFolder = ioHandler.getResultFolderURL()
this.config.executionId = getAndIncrementExecutionID(resultsFolder + "expID.txt")
ioHandler.writeToJSONFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration")
ioHandler.writeToJSONFile(kubernetesBenchmark, "$resultsFolder${this.config.executionId}-benchmark-configuration")
ioHandler.writeToJSONFile(
kubernetesBenchmark,
"$resultsFolder${this.config.executionId}-benchmark-configuration"
)
val config = buildConfig()
// execute benchmarks for each load
......@@ -125,7 +128,10 @@ class TheodoliteExecutor(
config.compositeStrategy.findSuitableResource(load, config.resources)
}
}
ioHandler.writeToJSONFile(config.compositeStrategy.benchmarkExecutor.results, "$resultsFolder${this.config.executionId}-result")
ioHandler.writeToJSONFile(
config.compositeStrategy.benchmarkExecutor.results,
"$resultsFolder${this.config.executionId}-result"
)
}
private fun getAndIncrementExecutionID(fileURL: String): Int {
......
......@@ -25,14 +25,14 @@ private val logger = KotlinLogging.logger {}
*
* @constructor Create empty Theodolite yaml executor
*/
class TheodoliteYamlExecutor {
class TheodoliteStandalone {
private val parser = YamlParserFromFile()
fun start() {
logger.info { "Theodolite started" }
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/example-execution-yaml-resource.yaml"
val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "./config/example-benchmark-yaml-resource.yaml"
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "execution/execution.yaml"
val benchmarkPath = System.getenv("THEODOLITE_BENCHMARK") ?: "benchmark/benchmark.yaml"
logger.info { "Using $executionPath for BenchmarkExecution" }
logger.info { "Using $benchmarkPath for BenchmarkType" }
......
......@@ -4,13 +4,13 @@ import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.api.model.KubernetesResourceList
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import java.lang.Thread.sleep
abstract class AbstractStateHandler<T, L, D>(
private val client: KubernetesClient,
private val client: NamespacedKubernetesClient,
private val crd: Class<T>,
private val crdList: Class<L>
) : StateHandler<T> where T : CustomResource<*, *>?, T : HasMetadata, T : Namespaced, L : KubernetesResourceList<T> {
......@@ -21,9 +21,8 @@ abstract class AbstractStateHandler<T,L,D>(
@Synchronized
override fun setState(resourceName: String, f: (T) -> T?) {
this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.filter { it.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.forEach { this.crdClient.updateStatus(it) }
}
......@@ -31,15 +30,19 @@ abstract class AbstractStateHandler<T,L,D>(
@Synchronized
override fun getState(resourceName: String, f: (T) -> String?): String? {
return this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.filter { it.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.firstOrNull()
}
@Synchronized
override fun blockUntilStateIsSet(resourceName: String, desiredStatusString: String, f: (T) -> String?, maxTries: Int): Boolean {
override fun blockUntilStateIsSet(
resourceName: String,
desiredStatusString: String,
f: (T) -> String?,
maxTries: Int
): Boolean {
for (i in 0.rangeTo(maxTries)) {
val currentStatus = getState(resourceName, f)
if (currentStatus == desiredStatusString) {
......
......@@ -4,9 +4,9 @@ import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging
import org.json.JSONObject
import theodolite.execution.Shutdown
import theodolite.k8s.K8sContextFactory
import theodolite.k8s.ResourceByLabelHandler
import theodolite.model.crd.*
private val logger = KotlinLogging.logger {}
......@@ -29,9 +29,15 @@ class ClusterSetup(
clearByLabel()
}
/**
* This function searches for executions in the cluster that have the status running and tries to stop the execution.
* For this the corresponding benchmark is searched and terminated.
*
* Throws [IllegalStateException] if no suitable benchmark can be found.
*
*/
private fun stopRunningExecution() {
executionCRDClient
.inNamespace(client.namespace)
.list()
.items
.asSequence()
......@@ -50,27 +56,35 @@ class ClusterSetup(
} else {
logger.error {
"Execution with state ${States.RUNNING.value} was found, but no corresponding benchmark. " +
"Could not initialize cluster." }
"Could not initialize cluster."
}
throw IllegalStateException("Cluster state is invalid, required Benchmark for running execution not found.")
}
}
}
private fun clearByLabel() {
this.client.services().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().deployments().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().statefulSets().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.configMaps().withLabel("app.kubernetes.io/created-by=theodolite").delete()
val serviceMonitors = JSONObject(
this.client.customResource(serviceMonitorContext)
.list(client.namespace, mapOf(Pair("app.kubernetes.io/created-by", "theodolite")))
val resourceRemover = ResourceByLabelHandler(client = client)
resourceRemover.removeServices(
labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite"
)
resourceRemover.removeDeployments(
labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite"
)
resourceRemover.removeStatefulSets(
labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite"
)
resourceRemover.removeConfigMaps(
labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite"
)
resourceRemover.removeCR(
labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite",
context = serviceMonitorContext
)
.getJSONArray("items")
(0 until serviceMonitors.length())
.map { serviceMonitors.getJSONObject(it).getJSONObject("metadata").getString("name") }
.forEach { this.client.customResource(serviceMonitorContext).delete(client.namespace, it) }
}
}
\ No newline at end of file
......@@ -64,7 +64,8 @@ class ExecutionHandler(
this.controller.stop(restart = true)
}
}
States.RESTART -> {} // should this set to pending?
States.RESTART -> {
} // should this set to pending?
else -> this.stateHandler.setExecutionState(newExecution.spec.name, States.PENDING)
}
}
......@@ -79,7 +80,8 @@ class ExecutionHandler(
override fun onDelete(execution: ExecutionCRD, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name}" }
if (execution.status.executionState == States.RUNNING.value
&& this.controller.isExecutionRunning(execution.spec.name)) {
&& this.controller.isExecutionRunning(execution.spec.name)
) {
this.controller.stop()
}
}
......
package theodolite.execution.operator
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import theodolite.model.crd.BenchmarkExecutionList
import theodolite.model.crd.ExecutionCRD
import theodolite.model.crd.ExecutionStatus
......@@ -10,7 +10,7 @@ import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
class ExecutionStateHandler(val client: KubernetesClient):
class ExecutionStateHandler(val client: NamespacedKubernetesClient) :
AbstractStateHandler<ExecutionCRD, BenchmarkExecutionList, ExecutionStatus>(
client = client,
crd = ExecutionCRD::class.java,
......
......@@ -17,6 +17,7 @@ class LeaderElector(
val name: String
) {
// TODO(what is the name of the lock? .withName() or LeaseLock(..,name..) ?)
fun getLeadership(leader: KFunction0<Unit>) {
val lockIdentity: String = UUID.randomUUID().toString()
DefaultKubernetesClient().use { kc ->
......
......@@ -9,6 +9,7 @@ interface StateHandler<T> {
resourceName: String,
desiredStatusString: String,
f: (T) -> String?,
maxTries: Int = MAX_TRIES): Boolean
maxTries: Int = MAX_TRIES
): Boolean
}
\ No newline at end of file
......@@ -7,11 +7,15 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.execution.TheodoliteExecutor
import theodolite.model.crd.*
import theodolite.util.ConfigurationOverride
import theodolite.util.PatcherDefinition
import theodolite.patcher.ConfigOverrideModifier
import theodolite.util.ExecutionStateComparator
import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {}
const val DEPLOYED_FOR_EXECUTION_LABEL_NAME = "deployed-for-execution"
const val DEPLOYED_FOR_BENCHMARK_LABEL_NAME = "deployed-for-benchmark"
const val CREATED_BY_LABEL_NAME = "app.kubernetes.io/created-by"
const val CREATED_BY_LABEL_VALUE = "theodolite"
/**
* The controller implementation for Theodolite.
......@@ -22,7 +26,6 @@ private val logger = KotlinLogging.logger {}
*/
class TheodoliteController(
val path: String,
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
private val executionStateHandler: ExecutionStateHandler
......@@ -62,21 +65,23 @@ class TheodoliteController(
* @see BenchmarkExecution
*/
private fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
setAdditionalLabels(execution.name,
"deployed-for-execution",
benchmark.appResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } }
+ benchmark.loadGenResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } },
execution)
setAdditionalLabels(benchmark.name,
"deployed-for-benchmark",
benchmark.appResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } }
+ benchmark.loadGenResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } },
execution)
setAdditionalLabels("theodolite",
"app.kubernetes.io/created-by",
benchmark.appResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } }
+ benchmark.loadGenResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } },
execution)
val modifier = ConfigOverrideModifier(
execution = execution,
resources = benchmark.appResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } }
+ benchmark.loadGenResourceSets.flatMap { it -> it.loadResourceSet().map { it.first } }
)
modifier.setAdditionalLabels(
labelValue = execution.name,
labelName = DEPLOYED_FOR_EXECUTION_LABEL_NAME
)
modifier.setAdditionalLabels(
labelValue = benchmark.name,
labelName = DEPLOYED_FOR_BENCHMARK_LABEL_NAME
)
modifier.setAdditionalLabels(
labelValue = CREATED_BY_LABEL_VALUE,
labelName = CREATED_BY_LABEL_NAME
)
executionStateHandler.setExecutionState(execution.name, States.RUNNING)
executionStateHandler.startDurationStateTimer(execution.name)
......@@ -104,9 +109,6 @@ class TheodoliteController(
if (!::executor.isInitialized) return
if (restart) {
executionStateHandler.setExecutionState(this.executor.getExecution().name, States.RESTART)
} else {
executionStateHandler.setExecutionState(this.executor.getExecution().name, States.INTERRUPTED)
logger.warn { "Execution ${executor.getExecution().name} unexpected interrupted" }
}
this.executor.executor.run.set(false)
}
......@@ -118,9 +120,10 @@ class TheodoliteController(
return this.benchmarkCRDClient
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { it.spec.path = path; it } // TODO check if we can remove the path field from the KubernetesBenchmark
.map { it.spec }
.map {
it.spec.name = it.metadata.name
it.spec
}
}
/**
......@@ -135,6 +138,7 @@ class TheodoliteController(
* @return the next execution or null
*/
private fun getNextExecution(): BenchmarkExecution? {
val comparator = ExecutionStateComparator(States.RESTART)
val availableBenchmarkNames = getBenchmarks()
.map { it.name }
......@@ -148,46 +152,13 @@ class TheodoliteController(
it.status.executionState == States.RESTART.value
}
.filter { availableBenchmarkNames.contains(it.spec.benchmark) }
.sortedWith(stateComparator().thenBy { it.metadata.creationTimestamp })
.sortedWith(comparator.thenBy { it.metadata.creationTimestamp })
.map { it.spec }
.firstOrNull()
}
/**
* Simple comparator which can be used to order a list of [ExecutionCRD] such that executions with
* status [States.RESTART] are before all other executions.
*/
private fun stateComparator() = Comparator<ExecutionCRD> { a, b ->
when {
(a == null && b == null) -> 0
(a.status.executionState == States.RESTART.value) -> -1
else -> 1
}
}
fun isExecutionRunning(executionName: String): Boolean {
if (!::executor.isInitialized) return false
return this.executor.getExecution().name == executionName
}
private fun setAdditionalLabels(
labelValue: String,
labelName: String,
resources: List<String>,
execution: BenchmarkExecution
) {
val additionalConfigOverrides = mutableListOf<ConfigurationOverride>()
resources.forEach {
run {
val configurationOverride = ConfigurationOverride()
configurationOverride.patcher = PatcherDefinition()
configurationOverride.patcher.type = "LabelPatcher"
configurationOverride.patcher.properties = mutableMapOf("variableName" to labelName)
configurationOverride.patcher.resource = it
configurationOverride.value = labelValue
additionalConfigOverrides.add(configurationOverride)
}
}
execution.configOverrides.addAll(additionalConfigOverrides)
}
}
\ No newline at end of file
......@@ -28,7 +28,6 @@ private val logger = KotlinLogging.logger {}
*/
class TheodoliteOperator {
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
private val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
private val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace)
private lateinit var controller: TheodoliteController
......@@ -38,7 +37,7 @@ class TheodoliteOperator {
fun start() {
LeaderElector(
client = client,
name = "theodolite-operator"
name = "theodolite-operator" // TODO(make leaslock name configurable via env var)
)
.getLeadership(::startOperator)
}
......@@ -76,7 +75,10 @@ class TheodoliteOperator {
}
}
fun getExecutionEventHandler(controller: TheodoliteController, client: NamespacedKubernetesClient): SharedInformerFactory {
fun getExecutionEventHandler(
controller: TheodoliteController,
client: NamespacedKubernetesClient
): SharedInformerFactory {
val factory = client.informers()
.inNamespace(client.namespace)
......@@ -105,7 +107,6 @@ class TheodoliteOperator {
): TheodoliteController {
if (!::controller.isInitialized) {
this.controller = TheodoliteController(
path = this.appResource,
benchmarkCRDClient = getBenchmarkClient(client),
executionCRDClient = getExecutionClient(client),
executionStateHandler = executionStateHandler
......
......@@ -7,7 +7,10 @@ import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
class CustomResourceWrapper(val crAsMap: Map<String, String>, private val context: CustomResourceDefinitionContext) : KubernetesResource {
class CustomResourceWrapper(
private val crAsMap: Map<String, String>,
private val context: CustomResourceDefinitionContext
) : KubernetesResource {
/**
* Deploy a service monitor
*
......@@ -41,9 +44,4 @@ class CustomResourceWrapper(val crAsMap: Map<String, String>, private val contex
val metadataAsMap = this.crAsMap["metadata"]!! as Map<String, String>
return metadataAsMap["name"]!!
}
fun getLabels(): Map<String, String>{
val metadataAsMap = this.crAsMap["metadata"]!! as Map<String, String>
return metadataAsMap["labels"]!! as Map<String, String>
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment