From 66a0650a1f7928056b001c410c5f6e1dc025ddb0 Mon Sep 17 00:00:00 2001 From: "stu126940@mail.uni-kiel.de" <stu126940@mail.uni-kiel.de> Date: Fri, 26 Nov 2021 11:24:22 +0100 Subject: [PATCH] Introduces a BechnmarkStateChecker which checks in each iteration of the controller whether all required resouce can be loaded and if all specified actions could be executed (all required resource are available). Small fixes. --- .../examples/operator/example-benchmark.yaml | 16 +- .../kotlin/theodolite/benchmark/Action.kt | 2 +- .../theodolite/benchmark/ActionCommand.kt | 18 +- .../benchmark/KubernetesBenchmark.kt | 4 +- .../KubernetesBenchmarkDeployment.kt | 8 +- .../operator/BenchmarkStateChecker.kt | 170 ++++++++++++++++++ .../operator/TheodoliteController.kt | 34 +--- .../execution/operator/TheodoliteOperator.kt | 18 +- 8 files changed, 221 insertions(+), 49 deletions(-) create mode 100644 theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt diff --git a/theodolite/examples/operator/example-benchmark.yaml b/theodolite/examples/operator/example-benchmark.yaml index 87b3d83c1..3302ea8a8 100644 --- a/theodolite/examples/operator/example-benchmark.yaml +++ b/theodolite/examples/operator/example-benchmark.yaml @@ -3,6 +3,12 @@ kind: benchmark metadata: name: uc1-kstreams spec: + infrastructure: + resources: + - configMap: + name: "example-configmap" + files: + - "uc1-kstreams-deployment.yaml" sut: resources: - configMap: @@ -13,10 +19,9 @@ spec: - selector: pod: matchLabels: - app: kafka-client - container: "abc" # optional (if there is only one container or there exists something like a default container) + app: titan-ccp-aggregation exec: - command: "kafka-topics.sh --create --topic abc" # or list + command: "ls -l" # or list loadGenerator: resources: - configMap: @@ -28,8 +33,9 @@ spec: - selector: pod: matchLabels: - app: kafka-client - container: "abc" # optional (if there is only one container or there exists something like a default container) + 'app.kubernetes.io/name': grafana + + #container: "abc" # optional (if there is only one container or there exists something like a default container) exec: command: "kafka-topics.sh --create --topic abc" # or list resourceTypes: diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt index 653a71bd9..f6fac38d0 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt @@ -27,7 +27,7 @@ class Action { @RegisterForReflection class ActionSelector { lateinit var pod: PodSelector - lateinit var container: String + var container: String = "" } @JsonDeserialize @RegisterForReflection diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt index 762487c6d..43167bcc3 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt @@ -18,10 +18,22 @@ class ActionCommand(val client: NamespacedKubernetesClient) { private val execLatch = CountDownLatch(1); fun exec(matchLabels: MutableMap<String, String>, container: String, command: String): Pair<String, String> { + println("container is: $container") try { - val execWatch: ExecWatch = client.pods() - .withName(getPodName(matchLabels)) - .inContainer(container) + + val execWatch: ExecWatch = if (container.isNotEmpty()) { + println("container is not empty") + client.pods() + .inNamespace(client.namespace) + .withName(getPodName(matchLabels)) + .inContainer(container) + + } else { + println("container is empty") + client.pods() + .inNamespace(client.namespace) + .withName(getPodName(matchLabels)) + } .writingOutput(out) .writingError(error) .usingListener(MyPodExecListener(execLatch)) diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 19f9cbb74..99cf12c56 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -59,7 +59,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { } override fun setupInfrastructure() { - this.infrastructure.beforeActions.forEach { it.exec } + this.infrastructure.beforeActions.forEach { it.exec(client = client) } val kubernetesManager = K8sManager(this.client) loadKubernetesResources(this.infrastructure.resources) .map{it.second} @@ -67,7 +67,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { } override fun teardownInfrastructure() { - this.infrastructure.afterActions.forEach { it.exec } + this.infrastructure.afterActions.forEach { it.exec(client = client) } val kubernetesManager = K8sManager(this.client) loadKubernetesResources(this.infrastructure.resources) .map{it.second} diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index db90f0e24..9f51638f1 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -48,8 +48,8 @@ class KubernetesBenchmarkDeployment( * - Deploy the needed resources. */ override fun setup() { - sutBeforeActions.forEach { it.exec } - loadGenBeforeActions.forEach { it.exec } + sutBeforeActions.forEach { it.exec(client = client) } + loadGenBeforeActions.forEach { it.exec(client = client) } val kafkaTopics = this.topics.filter { !it.removeOnly } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } @@ -74,8 +74,8 @@ class KubernetesBenchmarkDeployment( labelName = LAG_EXPORTER_POD_LABEL_NAME, labelValue = LAG_EXPORTER_POD_LABEL_VALUE ) - sutAfterActions.forEach { it.exec } - loadGenAfterActions.forEach { it.exec } + sutAfterActions.forEach { it.exec(client = client) } + loadGenAfterActions.forEach { it.exec(client = client) } logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." } Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis()) } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt new file mode 100644 index 000000000..674b62a80 --- /dev/null +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt @@ -0,0 +1,170 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.api.model.apps.Deployment +import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import io.fabric8.kubernetes.client.dsl.MixedOperation +import io.fabric8.kubernetes.client.dsl.Resource +import theodolite.benchmark.Action +import theodolite.benchmark.ActionSelector +import theodolite.benchmark.KubernetesBenchmark +import theodolite.model.crd.BenchmarkCRD +import theodolite.model.crd.BenchmarkStates +import theodolite.model.crd.KubernetesBenchmarkList + +class BenchmarkStateChecker( + private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, + private val benchmarkStateHandler: BenchmarkStateHandler, + private val client: NamespacedKubernetesClient + +) { + /** + * Checks and updates the states off all deployed benchmarks. + * + */ + fun updateBenchmarkStatus() { + this.benchmarkCRDClient + .list() + .items + .map { it.spec.name = it.metadata.name; it } + .map { Pair(it, checkState(it.spec)) } + .forEach { setState(it.first, it.second) } + } + + private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) { + benchmarkStateHandler.setResourceSetState(resource.spec.name, state) + } + + /** + * Checks the state of the benchmark. + * + * @param benchmark The benchmark to check + * @return [BenchmarkStates.READY] iff all resource could be loaded and all actions could be executed, [BenchmarkStates.PENDING] else + */ + private fun checkState(benchmark: KubernetesBenchmark): BenchmarkStates { + return if (checkActionCommands(benchmark) == BenchmarkStates.READY + && checkResources(benchmark) == BenchmarkStates.READY + ) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } + + /** + * Checks if all specified actions of the given benchmark could be executed or not + * + * @param benchmark The benchmark to check + * @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING] + */ + private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates { + return if (checkIfActionPossible(benchmark, benchmark.sut.beforeActions) + && checkIfActionPossible(benchmark, benchmark.sut.afterActions) + && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) + && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) + ) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } + + /** + * Action commands are called on a pod. To verify that an action command can be executed, + * it checks that the specified pods are either currently running in the cluster or + * have been specified as infrastructure in the benchmark. + * + * @param benchmark the benchmark to check + * @param actions the actions + * @return true if all actions could be executed, else false + */ + private fun checkIfActionPossible(benchmark: KubernetesBenchmark, actions: List<Action>): Boolean { + return !actions.map { + checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(benchmark, it.selector) + }.contains(false) + } + + /** + * Checks for the given actionSelector whether the required resources are already deployed in the cluster or not + * + * @param selector the actionSelector to check + * @return true if the required resources are found, else false + */ + private fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean { + val pods = this.client + .pods() + .withLabels(selector.pod.matchLabels) + .list() + .items + + return if (pods.isNotEmpty() && selector.container.isNotEmpty()) { + pods.map { pod -> + pod + .spec + .containers + .map { it.name } + .contains(selector.container) + }.contains(false) + } else { + pods.isNotEmpty() + } + + } + + /** + * Checks for the given actionSelector whether the required resources are specified as infrastructure or not + * + * @param benchmark the benchmark to check + * @param selector the actionSelector to check + * @return true if the required resources are found, else false + */ + private fun checkIfResourceIsInfrastructure(benchmark: KubernetesBenchmark, selector: ActionSelector): Boolean { + val resources = benchmark.loadKubernetesResources(resourceSet = benchmark.infrastructure.resources) + + return if (resources.isEmpty()) { + false + } else { + resources.map { it.second } + .filterIsInstance<Deployment>() + .filter { it.spec.selector.matchLabels.containsMatchLabels(selector.pod.matchLabels) } + .any { + if (selector.container.isNotEmpty()) { + it.spec.template.spec.containers.map { it.name }.contains(selector.container) + } else { + true + } + } + } + } + + /** + * Checks if it is possible to load all specified Kubernetes manifests. + * + * @param benchmark The benchmark to check + * @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING] + */ + private fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { // TODO check if infrastructure could be loaded or not + return try { + val appResources = + benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) + val loadGenResources = + benchmark.loadKubernetesResources(resourceSet = benchmark.loadGenerator.resources) + if (appResources.isNotEmpty() && loadGenResources.isNotEmpty()) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } catch (e: Exception) { + BenchmarkStates.PENDING + } + } +} + +private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>) : Boolean { + for (kv in matchLabels) { + if (kv.value != this[kv.key as K]) { + return false + } + } + return true +} + diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt index 70e30cf84..3bff0a403 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -29,7 +29,7 @@ class TheodoliteController( private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val executionStateHandler: ExecutionStateHandler, - private val benchmarkStateHandler: BenchmarkStateHandler + private val benchmarkSateChecker: BenchmarkStateChecker ) { lateinit var executor: TheodoliteExecutor @@ -41,7 +41,7 @@ class TheodoliteController( sleep(5000) // wait until all states are correctly set while (true) { reconcile() - updateBenchmarkStatus() + benchmarkSateChecker.updateBenchmarkStatus() sleep(2000) } } @@ -49,7 +49,7 @@ class TheodoliteController( private fun reconcile() { do { val execution = getNextExecution() - updateBenchmarkStatus() + benchmarkSateChecker.updateBenchmarkStatus() if (execution != null) { val benchmark = getBenchmarks() .map { it.spec } @@ -139,7 +139,6 @@ class TheodoliteController( } } - /** * Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution] * is selected for the next execution depends on three points: @@ -173,34 +172,7 @@ class TheodoliteController( .firstOrNull() } - private fun updateBenchmarkStatus() { - this.benchmarkCRDClient - .list() - .items - .map { it.spec.name = it.metadata.name; it } - .map { Pair(it, checkResource(it.spec)) } - .forEach { setState(it.first, it.second ) } - } - - private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) { - benchmarkStateHandler.setResourceSetState(resource.spec.name, state) - } - private fun checkResource(benchmark: KubernetesBenchmark): BenchmarkStates { - return try { - val appResources = - benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) - val loadGenResources = - benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) - if(appResources.isNotEmpty() && loadGenResources.isNotEmpty()) { - BenchmarkStates.READY - } else { - BenchmarkStates.PENDING - } - } catch (e: Exception) { - BenchmarkStates.PENDING - } - } fun isExecutionRunning(executionName: String): Boolean { if (!::executor.isInitialized) return false diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt index 4850a44fd..f74c2ef5f 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -34,6 +34,7 @@ class TheodoliteOperator { private lateinit var controller: TheodoliteController private lateinit var executionStateHandler: ExecutionStateHandler private lateinit var benchmarkStateHandler: BenchmarkStateHandler + private lateinit var benchmarkStateChecker: BenchmarkStateChecker fun start() { @@ -71,7 +72,7 @@ class TheodoliteOperator { controller = getController( client = client, executionStateHandler = getExecutionStateHandler(client = client), - benchmarkStateHandler = getBenchmarkStateHandler(client = client) + benchmarkStateChecker = getBenchmarkStateChecker(client = client) ) getExecutionEventHandler(controller, client).startAllRegisteredInformers() @@ -112,17 +113,28 @@ class TheodoliteOperator { return benchmarkStateHandler } + fun getBenchmarkStateChecker(client: NamespacedKubernetesClient) : BenchmarkStateChecker { + if (!::benchmarkStateChecker.isInitialized) { + this.benchmarkStateChecker = BenchmarkStateChecker( + client = client, + benchmarkStateHandler = getBenchmarkStateHandler(client = client), + benchmarkCRDClient = getBenchmarkClient(client = client)) + } + return benchmarkStateChecker + } + + fun getController( client: NamespacedKubernetesClient, executionStateHandler: ExecutionStateHandler, - benchmarkStateHandler: BenchmarkStateHandler + benchmarkStateChecker: BenchmarkStateChecker ): TheodoliteController { if (!::controller.isInitialized) { this.controller = TheodoliteController( benchmarkCRDClient = getBenchmarkClient(client), executionCRDClient = getExecutionClient(client), executionStateHandler = executionStateHandler, - benchmarkStateHandler = benchmarkStateHandler + benchmarkSateChecker = benchmarkStateChecker ) } return this.controller -- GitLab