diff --git a/theodolite/examples/operator/example-benchmark.yaml b/theodolite/examples/operator/example-benchmark.yaml index 87b3d83c1993d7a36078d7fec1836837e3a9c009..3302ea8a83bff270a05cc9041d8d69b7904634ed 100644 --- a/theodolite/examples/operator/example-benchmark.yaml +++ b/theodolite/examples/operator/example-benchmark.yaml @@ -3,6 +3,12 @@ kind: benchmark metadata: name: uc1-kstreams spec: + infrastructure: + resources: + - configMap: + name: "example-configmap" + files: + - "uc1-kstreams-deployment.yaml" sut: resources: - configMap: @@ -13,10 +19,9 @@ spec: - selector: pod: matchLabels: - app: kafka-client - container: "abc" # optional (if there is only one container or there exists something like a default container) + app: titan-ccp-aggregation exec: - command: "kafka-topics.sh --create --topic abc" # or list + command: "ls -l" # or list loadGenerator: resources: - configMap: @@ -28,8 +33,9 @@ spec: - selector: pod: matchLabels: - app: kafka-client - container: "abc" # optional (if there is only one container or there exists something like a default container) + 'app.kubernetes.io/name': grafana + + #container: "abc" # optional (if there is only one container or there exists something like a default container) exec: command: "kafka-topics.sh --create --topic abc" # or list resourceTypes: diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt index 653a71bd9c12d462a0a944221dd491978d2ae58b..f6fac38d035759d11614db16a6ddd1fb6de58e14 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/Action.kt @@ -27,7 +27,7 @@ class Action { @RegisterForReflection class ActionSelector { lateinit var pod: PodSelector - lateinit var container: String + var container: String = "" } @JsonDeserialize @RegisterForReflection diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt index 762487c6db7e11ed9e1d13384e3796be81f6074f..43167bcc33374386fcb324ae8d4201b7bc88b57f 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt @@ -18,10 +18,22 @@ class ActionCommand(val client: NamespacedKubernetesClient) { private val execLatch = CountDownLatch(1); fun exec(matchLabels: MutableMap<String, String>, container: String, command: String): Pair<String, String> { + println("container is: $container") try { - val execWatch: ExecWatch = client.pods() - .withName(getPodName(matchLabels)) - .inContainer(container) + + val execWatch: ExecWatch = if (container.isNotEmpty()) { + println("container is not empty") + client.pods() + .inNamespace(client.namespace) + .withName(getPodName(matchLabels)) + .inContainer(container) + + } else { + println("container is empty") + client.pods() + .inNamespace(client.namespace) + .withName(getPodName(matchLabels)) + } .writingOutput(out) .writingError(error) .usingListener(MyPodExecListener(execLatch)) diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 19f9cbb74821e66ff1e7831750ed0a0eed83673b..99cf12c5680b3eb6a5f9c74c402ceed8974afb3d 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -59,7 +59,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { } override fun setupInfrastructure() { - this.infrastructure.beforeActions.forEach { it.exec } + this.infrastructure.beforeActions.forEach { it.exec(client = client) } val kubernetesManager = K8sManager(this.client) loadKubernetesResources(this.infrastructure.resources) .map{it.second} @@ -67,7 +67,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { } override fun teardownInfrastructure() { - this.infrastructure.afterActions.forEach { it.exec } + this.infrastructure.afterActions.forEach { it.exec(client = client) } val kubernetesManager = K8sManager(this.client) loadKubernetesResources(this.infrastructure.resources) .map{it.second} diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index db90f0e24416531d39b0865e47c54220f8af7b11..9f51638f1fd507c3fb4c1ab1fa98dc89ba5af2f9 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -48,8 +48,8 @@ class KubernetesBenchmarkDeployment( * - Deploy the needed resources. */ override fun setup() { - sutBeforeActions.forEach { it.exec } - loadGenBeforeActions.forEach { it.exec } + sutBeforeActions.forEach { it.exec(client = client) } + loadGenBeforeActions.forEach { it.exec(client = client) } val kafkaTopics = this.topics.filter { !it.removeOnly } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } @@ -74,8 +74,8 @@ class KubernetesBenchmarkDeployment( labelName = LAG_EXPORTER_POD_LABEL_NAME, labelValue = LAG_EXPORTER_POD_LABEL_VALUE ) - sutAfterActions.forEach { it.exec } - loadGenAfterActions.forEach { it.exec } + sutAfterActions.forEach { it.exec(client = client) } + loadGenAfterActions.forEach { it.exec(client = client) } logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." } Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis()) } diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt new file mode 100644 index 0000000000000000000000000000000000000000..674b62a8007965a0dcc2a5b2ee756a522553c14c --- /dev/null +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt @@ -0,0 +1,170 @@ +package theodolite.execution.operator + +import io.fabric8.kubernetes.api.model.apps.Deployment +import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import io.fabric8.kubernetes.client.dsl.MixedOperation +import io.fabric8.kubernetes.client.dsl.Resource +import theodolite.benchmark.Action +import theodolite.benchmark.ActionSelector +import theodolite.benchmark.KubernetesBenchmark +import theodolite.model.crd.BenchmarkCRD +import theodolite.model.crd.BenchmarkStates +import theodolite.model.crd.KubernetesBenchmarkList + +class BenchmarkStateChecker( + private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, + private val benchmarkStateHandler: BenchmarkStateHandler, + private val client: NamespacedKubernetesClient + +) { + /** + * Checks and updates the states off all deployed benchmarks. + * + */ + fun updateBenchmarkStatus() { + this.benchmarkCRDClient + .list() + .items + .map { it.spec.name = it.metadata.name; it } + .map { Pair(it, checkState(it.spec)) } + .forEach { setState(it.first, it.second) } + } + + private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) { + benchmarkStateHandler.setResourceSetState(resource.spec.name, state) + } + + /** + * Checks the state of the benchmark. + * + * @param benchmark The benchmark to check + * @return [BenchmarkStates.READY] iff all resource could be loaded and all actions could be executed, [BenchmarkStates.PENDING] else + */ + private fun checkState(benchmark: KubernetesBenchmark): BenchmarkStates { + return if (checkActionCommands(benchmark) == BenchmarkStates.READY + && checkResources(benchmark) == BenchmarkStates.READY + ) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } + + /** + * Checks if all specified actions of the given benchmark could be executed or not + * + * @param benchmark The benchmark to check + * @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING] + */ + private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates { + return if (checkIfActionPossible(benchmark, benchmark.sut.beforeActions) + && checkIfActionPossible(benchmark, benchmark.sut.afterActions) + && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) + && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) + ) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } + + /** + * Action commands are called on a pod. To verify that an action command can be executed, + * it checks that the specified pods are either currently running in the cluster or + * have been specified as infrastructure in the benchmark. + * + * @param benchmark the benchmark to check + * @param actions the actions + * @return true if all actions could be executed, else false + */ + private fun checkIfActionPossible(benchmark: KubernetesBenchmark, actions: List<Action>): Boolean { + return !actions.map { + checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(benchmark, it.selector) + }.contains(false) + } + + /** + * Checks for the given actionSelector whether the required resources are already deployed in the cluster or not + * + * @param selector the actionSelector to check + * @return true if the required resources are found, else false + */ + private fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean { + val pods = this.client + .pods() + .withLabels(selector.pod.matchLabels) + .list() + .items + + return if (pods.isNotEmpty() && selector.container.isNotEmpty()) { + pods.map { pod -> + pod + .spec + .containers + .map { it.name } + .contains(selector.container) + }.contains(false) + } else { + pods.isNotEmpty() + } + + } + + /** + * Checks for the given actionSelector whether the required resources are specified as infrastructure or not + * + * @param benchmark the benchmark to check + * @param selector the actionSelector to check + * @return true if the required resources are found, else false + */ + private fun checkIfResourceIsInfrastructure(benchmark: KubernetesBenchmark, selector: ActionSelector): Boolean { + val resources = benchmark.loadKubernetesResources(resourceSet = benchmark.infrastructure.resources) + + return if (resources.isEmpty()) { + false + } else { + resources.map { it.second } + .filterIsInstance<Deployment>() + .filter { it.spec.selector.matchLabels.containsMatchLabels(selector.pod.matchLabels) } + .any { + if (selector.container.isNotEmpty()) { + it.spec.template.spec.containers.map { it.name }.contains(selector.container) + } else { + true + } + } + } + } + + /** + * Checks if it is possible to load all specified Kubernetes manifests. + * + * @param benchmark The benchmark to check + * @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING] + */ + private fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { // TODO check if infrastructure could be loaded or not + return try { + val appResources = + benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) + val loadGenResources = + benchmark.loadKubernetesResources(resourceSet = benchmark.loadGenerator.resources) + if (appResources.isNotEmpty() && loadGenResources.isNotEmpty()) { + BenchmarkStates.READY + } else { + BenchmarkStates.PENDING + } + } catch (e: Exception) { + BenchmarkStates.PENDING + } + } +} + +private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>) : Boolean { + for (kv in matchLabels) { + if (kv.value != this[kv.key as K]) { + return false + } + } + return true +} + diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt index 70e30cf84ef40796eb085a0d68eb2e323232fde9..3bff0a403e64f85781b8f5f675ca47a7ffa52ff8 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -29,7 +29,7 @@ class TheodoliteController( private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val executionStateHandler: ExecutionStateHandler, - private val benchmarkStateHandler: BenchmarkStateHandler + private val benchmarkSateChecker: BenchmarkStateChecker ) { lateinit var executor: TheodoliteExecutor @@ -41,7 +41,7 @@ class TheodoliteController( sleep(5000) // wait until all states are correctly set while (true) { reconcile() - updateBenchmarkStatus() + benchmarkSateChecker.updateBenchmarkStatus() sleep(2000) } } @@ -49,7 +49,7 @@ class TheodoliteController( private fun reconcile() { do { val execution = getNextExecution() - updateBenchmarkStatus() + benchmarkSateChecker.updateBenchmarkStatus() if (execution != null) { val benchmark = getBenchmarks() .map { it.spec } @@ -139,7 +139,6 @@ class TheodoliteController( } } - /** * Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution] * is selected for the next execution depends on three points: @@ -173,34 +172,7 @@ class TheodoliteController( .firstOrNull() } - private fun updateBenchmarkStatus() { - this.benchmarkCRDClient - .list() - .items - .map { it.spec.name = it.metadata.name; it } - .map { Pair(it, checkResource(it.spec)) } - .forEach { setState(it.first, it.second ) } - } - - private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) { - benchmarkStateHandler.setResourceSetState(resource.spec.name, state) - } - private fun checkResource(benchmark: KubernetesBenchmark): BenchmarkStates { - return try { - val appResources = - benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) - val loadGenResources = - benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) - if(appResources.isNotEmpty() && loadGenResources.isNotEmpty()) { - BenchmarkStates.READY - } else { - BenchmarkStates.PENDING - } - } catch (e: Exception) { - BenchmarkStates.PENDING - } - } fun isExecutionRunning(executionName: String): Boolean { if (!::executor.isInitialized) return false diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt index 4850a44fdddba117178e29d3170f44a95df646e7..f74c2ef5fdf451799709183554a0a147cf89cfae 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/TheodoliteOperator.kt @@ -34,6 +34,7 @@ class TheodoliteOperator { private lateinit var controller: TheodoliteController private lateinit var executionStateHandler: ExecutionStateHandler private lateinit var benchmarkStateHandler: BenchmarkStateHandler + private lateinit var benchmarkStateChecker: BenchmarkStateChecker fun start() { @@ -71,7 +72,7 @@ class TheodoliteOperator { controller = getController( client = client, executionStateHandler = getExecutionStateHandler(client = client), - benchmarkStateHandler = getBenchmarkStateHandler(client = client) + benchmarkStateChecker = getBenchmarkStateChecker(client = client) ) getExecutionEventHandler(controller, client).startAllRegisteredInformers() @@ -112,17 +113,28 @@ class TheodoliteOperator { return benchmarkStateHandler } + fun getBenchmarkStateChecker(client: NamespacedKubernetesClient) : BenchmarkStateChecker { + if (!::benchmarkStateChecker.isInitialized) { + this.benchmarkStateChecker = BenchmarkStateChecker( + client = client, + benchmarkStateHandler = getBenchmarkStateHandler(client = client), + benchmarkCRDClient = getBenchmarkClient(client = client)) + } + return benchmarkStateChecker + } + + fun getController( client: NamespacedKubernetesClient, executionStateHandler: ExecutionStateHandler, - benchmarkStateHandler: BenchmarkStateHandler + benchmarkStateChecker: BenchmarkStateChecker ): TheodoliteController { if (!::controller.isInitialized) { this.controller = TheodoliteController( benchmarkCRDClient = getBenchmarkClient(client), executionCRDClient = getExecutionClient(client), executionStateHandler = executionStateHandler, - benchmarkStateHandler = benchmarkStateHandler + benchmarkSateChecker = benchmarkStateChecker ) } return this.controller