diff --git a/theodolite/build.gradle b/theodolite/build.gradle index a425a9e337fea83f03f335e60324d54d41417f66..c01f261c7bd3c9e9b62bc7efeb93c492c499ab07 100644 --- a/theodolite/build.gradle +++ b/theodolite/build.gradle @@ -12,7 +12,11 @@ repositories { } dependencies { - implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}") + // Use latest Kubernetes client to use TestStandardHttpClient for exec commands tests + implementation platform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}") + implementation enforcedPlatform("io.fabric8:kubernetes-client-bom:6.8.0") + // Use default Quarkus once kubernetes-client 6.8.* is integrated + //implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}") implementation 'io.quarkus:quarkus-kotlin' implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' implementation 'io.quarkus:quarkus-arc' @@ -29,6 +33,12 @@ dependencies { testImplementation 'io.quarkus:quarkus-junit5' testImplementation 'io.quarkus:quarkus-test-kubernetes-client' + // Used for exec command tests + testImplementation ('io.fabric8:kubernetes-client-api') { + artifact { + classifier = 'tests' + } + } //testImplementation 'io.rest-assured:rest-assured' testImplementation 'com.github.tomakehurst:wiremock-jre8:2.35.0' testImplementation 'org.junit-pioneer:junit-pioneer:2.0.1' diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/Action.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/Action.kt index bdabb719672abf2975fdf8a0ad59868bbc6c1edf..486bbb88903b9222ea4864fdfaf115d55de12851 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/Action.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/Action.kt @@ -3,6 +3,7 @@ package rocks.theodolite.kubernetes import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection @@ -18,11 +19,11 @@ class Action { @JsonInclude(JsonInclude.Include.NON_NULL) var deleteCommand: DeleteCommand? = null - fun exec(client: NamespacedKubernetesClient) { + fun exec(client: KubernetesClient) { return if (execCommand != null) { - execCommand?.exec(client= client) !! + execCommand?.exec(client) !! } else if (deleteCommand != null) { - deleteCommand?.exec(client= client ) !! + deleteCommand?.exec(client) !! } else { throw DeploymentFailedException("Could not execute action. The action type must either be 'exec' or 'delete'.") } diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ActionCommand.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ActionCommand.kt index eefacbea9268f44969fd88d7650d5ddc5e00fb8e..e6dac50627652731fc7c00d127283b052d043a52 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ActionCommand.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ActionCommand.kt @@ -1,33 +1,26 @@ package rocks.theodolite.kubernetes -import io.fabric8.kubernetes.api.model.Status +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClientException -import io.fabric8.kubernetes.client.NamespacedKubernetesClient -import io.fabric8.kubernetes.client.dsl.ExecListener import io.fabric8.kubernetes.client.dsl.ExecWatch -import io.fabric8.kubernetes.client.utils.Serialization import mu.KotlinLogging import java.io.ByteArrayOutputStream import java.time.Duration -import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit private val logger = KotlinLogging.logger {} -class ActionCommand(val client: NamespacedKubernetesClient) { - var out: ByteArrayOutputStream = ByteArrayOutputStream() - var error: ByteArrayOutputStream = ByteArrayOutputStream() - var errChannelStream: ByteArrayOutputStream = ByteArrayOutputStream() - private val execLatch = CountDownLatch(1) +class ActionCommand(val client: KubernetesClient) { /** * Executes an action command. * * @param matchLabels matchLabels specifies on which pod the command should be executed. For this, the principle * `of any` of is used and the command is called on one of the possible pods. - * @param container (Optional) The container to run the command. Is optional iff exactly one container exist. * @param command The command to be executed. + * @param timeout (Optional) Timeout for running the command. + * @param container (Optional) The container to run the command. Is optional iff exactly one container exist. * @return the exit code of this executed command */ fun exec( @@ -37,69 +30,26 @@ class ActionCommand(val client: NamespacedKubernetesClient) { container: String = "" ): Int { try { - val execWatch: ExecWatch = if (container.isNotEmpty()) { - client.pods() - .inNamespace(client.namespace) - .withName(getPodName(matchLabels, 3)) - .inContainer(container) - - } else { - client.pods() - .inNamespace(client.namespace) - .withName(getPodName(matchLabels, 3)) - } - .writingOutput(out) - .writingError(error) - .writingErrorChannel(errChannelStream) - .usingListener(ActionCommandListener(execLatch)) + val outStream = ByteArrayOutputStream() + val errorStream = ByteArrayOutputStream() + val execWatch: ExecWatch = client.pods() + .inNamespace(client.namespace) + .withName(awaitPodName(matchLabels, 3)) + .let { if (container.isNotEmpty()) it.inContainer(container) else it } + .writingOutput(outStream) + .writingError(errorStream) .exec(*command) - - val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS) - if (!latchTerminationStatus) { - throw ActionCommandFailedException("Timeout while running action command") - } + val exitCode = execWatch.exitCode().get(timeout, TimeUnit.SECONDS) execWatch.close() - } catch (e: Exception) { - when (e) { - is InterruptedException -> { - Thread.currentThread().interrupt() - throw ActionCommandFailedException("Interrupted while waiting for the exec", e) - } - is KubernetesClientException -> { - throw ActionCommandFailedException("Error while executing command", e) - } - else -> { - throw e - } - } + logger.debug { "Execution Output Stream is \n $outStream" } + logger.debug { "Execution Error Stream is \n $errorStream" } + return exitCode + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw ActionCommandFailedException("Interrupted while waiting for the exec", e) + } catch (e: KubernetesClientException) { + throw ActionCommandFailedException("Error while executing command", e) } - logger.debug { "Execution Output Stream is \n $out" } - logger.debug { "Execution Error Stream is \n $error" } - logger.debug { "Execution ErrorChannel is: \n $errChannelStream" } - return getExitCode(errChannelStream) - } - - private fun getExitCode(errChannelStream: ByteArrayOutputStream): Int { - val status: Status? - try { - status = Serialization.unmarshal(errChannelStream.toString(), Status::class.java) - } catch (e: Exception) { - throw ActionCommandFailedException("Could not determine the exit code, no information given") - } - - if (status == null) { - throw ActionCommandFailedException("Could not determine the exit code, no information given") - } - - return if (status.status.equals("Success")) { - 0 - } else status.details.causes.stream() - .filter { it.reason.equals("ExitCode") } - .map { it.message } - .findFirst() - .orElseThrow { - ActionCommandFailedException("Status is not SUCCESS but contains no exit code - Status: $status") - }.toInt() } /** @@ -110,9 +60,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { * it can take a while until the status is ready and the pod can be selected. * @return the name of the pod or throws [ActionCommandFailedException] */ - fun getPodName(matchLabels: Map<String, String>, tries: Int): String { + fun awaitPodName(matchLabels: Map<String, String>, tries: Int): String { for (i in 1..tries) { - try { return getPodName(matchLabels) } catch (e: Exception) { @@ -141,16 +90,4 @@ class ActionCommand(val client: NamespacedKubernetesClient) { } } - private class ActionCommandListener(val execLatch: CountDownLatch) : ExecListener { - - override fun onFailure(throwable: Throwable, response: ExecListener.Response) { - execLatch.countDown() - throw ActionCommandFailedException("Some error encountered while executing action, caused ${throwable.message})") - } - - override fun onClose(code: Int, reason: String) { - execLatch.countDown() - } - } - } diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/DeleteCommand.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/DeleteCommand.kt index ef4409e5bdebfa8b232d5ed1080e93571cbaa618..502fb9b23558564a62bc2a0093fcfc25ede04823 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/DeleteCommand.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/DeleteCommand.kt @@ -2,8 +2,8 @@ package rocks.theodolite.kubernetes import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClientException -import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection import mu.KotlinLogging @@ -16,7 +16,7 @@ class DeleteCommand { lateinit var selector: DeleteActionSelector - fun exec(client: NamespacedKubernetesClient) { + fun exec(client: KubernetesClient) { logger.info { "Deleting all resources with apiVersion ${selector.apiVersion} and Kind ${selector.kind} matching regular expression ${selector.nameRegex}" } val regExp = selector.nameRegex.toRegex() val k8sManager = K8sManager(client) diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ExecCommand.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ExecCommand.kt index c13ffcde0a9a48ae86d48c2fdf6f373a8b5406ad..1691b414695c3eaaa207ff9c70788bf4e7801b71 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ExecCommand.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ExecCommand.kt @@ -2,7 +2,7 @@ package rocks.theodolite.kubernetes import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import io.fabric8.kubernetes.client.NamespacedKubernetesClient +import io.fabric8.kubernetes.client.KubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection @JsonDeserialize @@ -12,7 +12,7 @@ class ExecCommand { lateinit var selector: ExecActionSelector lateinit var command: Array<String> var timeoutSeconds: Long = Configuration.TIMEOUT_SECONDS - fun exec(client: NamespacedKubernetesClient) { + fun exec(client: KubernetesClient) { val exitCode = ActionCommand(client = client) .exec( matchLabels = selector.pod.matchLabels, diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/K8sManager.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/K8sManager.kt index 7856451edf4c31d668288f618fcee46b7246a619..0b34f4564829307df27f6910c5a7753f362b3dcf 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/K8sManager.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/K8sManager.kt @@ -3,6 +3,7 @@ package rocks.theodolite.kubernetes import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.api.model.apps.StatefulSet +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient import mu.KotlinLogging @@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {} * Supports: Deployments, Services, ConfigMaps, StatefulSets, and CustomResources. * @param client KubernetesClient used to deploy or remove. */ -class K8sManager(private val client: NamespacedKubernetesClient) { +class K8sManager(private val client: KubernetesClient) { /** * Deploys different k8s resources using the client. diff --git a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ResourceByLabelHandler.kt b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ResourceByLabelHandler.kt index 6fdf80e3c1fcace633adc135123fb95ab49d1fc4..ab42a67dd795c3fbd7469d590d6e547647088daa 100644 --- a/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ResourceByLabelHandler.kt +++ b/theodolite/src/main/kotlin/rocks/theodolite/kubernetes/ResourceByLabelHandler.kt @@ -1,5 +1,6 @@ package rocks.theodolite.kubernetes +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import mu.KotlinLogging @@ -10,7 +11,7 @@ private val logger = KotlinLogging.logger {} * The ResourceByLabelHandler provides basic functions to manage Kubernetes resources through their labels. * @param client NamespacedKubernetesClient used for the deletion. */ -class ResourceByLabelHandler(private val client: NamespacedKubernetesClient) { +class ResourceByLabelHandler(private val client: KubernetesClient) { /** * Deletes all pods with the selected label. diff --git a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/ActionCommandTest.kt b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/ActionCommandTest.kt index 2e9e910630e9838d6f1c2f05972c783e4980df91..6e7a5407a7113d2d88cf4fab4978f1d92c081aec 100644 --- a/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/ActionCommandTest.kt +++ b/theodolite/src/test/kotlin/rocks/theodolite/kubernetes/ActionCommandTest.kt @@ -1,123 +1,102 @@ package rocks.theodolite.kubernetes -import io.fabric8.kubernetes.api.model.Pod -import io.fabric8.kubernetes.api.model.PodBuilder -import io.fabric8.kubernetes.api.model.PodListBuilder -import io.fabric8.kubernetes.client.server.mock.KubernetesServer -import io.fabric8.kubernetes.client.utils.Utils +import io.fabric8.kubernetes.api.model.* +import io.fabric8.kubernetes.client.ConfigBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.KubernetesClientBuilder +import io.fabric8.kubernetes.client.http.* +import io.fabric8.kubernetes.client.utils.Serialization import io.quarkus.test.junit.QuarkusTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import rocks.theodolite.kubernetes.operator.TheodoliteController -import rocks.theodolite.kubernetes.operator.TheodoliteOperator +import org.junit.jupiter.api.Assertions.assertTrue +import org.mockito.ArgumentMatchers +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.concurrent.CompletableFuture @QuarkusTest class ActionCommandTest { - private val server = KubernetesServer(false, false) - lateinit var controller: TheodoliteController + + private lateinit var factory: TestStandardHttpClientFactory + private lateinit var client: KubernetesClient + private lateinit var httpClient: TestStandardHttpClient @BeforeEach fun setUp() { - server.before() - val operator = TheodoliteOperator(server.client) - this.controller = operator.getController( - executionStateHandler = operator.getExecutionStateHandler(), - benchmarkStateChecker = operator.getBenchmarkStateChecker() - ) - - val pod: Pod = PodBuilder().withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .withLabels<String, String>(mapOf("app" to "pod")) - .withNamespace("test").and() - .build() - - val ready: Pod = createReadyFrom(pod, "True") - - val podList = PodListBuilder().build() - podList.items.add(0, ready) - - server - .expect() - .withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod")}") - .andReturn(200, podList) - .always() - - server - .expect() - .get() - .withPath("/api/v1/namespaces/test/pods/pod1") - .andReturn(200, ready) - .always() - - server - .expect() - .withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true") - .andUpgradeToWebSocket() - .open(ErrorChannelMessage("{\"metadata\":{},\"status\":\"Success\"}\n")) - .done() - .always() - - server - .expect() - .withPath("/api/v1/namespaces/test/pods/pod1/exec?command=error-command&stdout=true&stderr=true") - .andUpgradeToWebSocket() - .open(ErrorChannelMessage("{\"metadata\":{},\"status\":\"failed\", \"details\":{}}\n")) - .done() - .always() + factory = TestStandardHttpClientFactory() + client = KubernetesClientBuilder() + .withHttpClientFactory(factory) + .withConfig(ConfigBuilder() + .withNamespace("test") + .build()) + .build() + httpClient = factory.instances.iterator().next() + + val podReadyList = PodListBuilder() + .withNewMetadata().endMetadata() + .addNewItem() + .withNewMetadata() + .withName("single-container") + .withLabels<String, String>(mapOf("app" to "single-container")) + .endMetadata() + .withNewSpec().addNewContainer().withName("single-container").endContainer().endSpec() + .withNewStatus().addNewCondition().withType("Ready").withStatus("True").endCondition().endStatus() + .endItem() + .build() + httpClient.expect("/api/v1/namespaces/test/pods", 200, client.kubernetesSerialization.asJson(podReadyList)) + httpClient.expect("/api/v1/namespaces/test/pods/single-container", 200, client.kubernetesSerialization.asJson(podReadyList.items[0])) + // "/api/v1/namespaces/test/pods" will be called twice + httpClient.expect("/api/v1/namespaces/test/pods", 200, client.kubernetesSerialization.asJson(podReadyList)) } - /** - * Copied from fabric8 Kubernetes Client repository - * - * @param pod - * @param status - * @return - */ - fun createReadyFrom(pod: Pod, status: String): Pod { - return PodBuilder(pod) - .withNewStatus() - .addNewCondition() - .withType("Ready") - .withStatus(status) - .endCondition() - .endStatus() - .build() - } @AfterEach fun tearDown() { - server.after() } @Test fun testGetPodName() { - assertEquals("pod1", ActionCommand(client = server.client).getPodName(mapOf("app" to "pod"), 1)) + assertEquals("single-container", ActionCommand(client = client).awaitPodName(mapOf("app" to "single-container"), 1)) } @Test fun testActionSuccess() { + val success = StatusBuilder() + .withStatus("Success") + .build() + httpClient.wsExpect("/api/v1/namespaces/test/pods/single-container/exec", buildWsFutureProvider(success)) + val action = Action().apply { execCommand = ExecCommand().apply { selector = ExecActionSelector().apply { pod = PodSelector().apply { - matchLabels = mapOf("app" to "pod") + matchLabels = mapOf("app" to "single-container") } } command = arrayOf("ls") timeoutSeconds = 10L } } + action.exec(this.client) - action.exec(server.client) - assertEquals( - "/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true", - server.lastRequest.path) + val calledUri = httpClient.recordedBuildWebSocketDirects[0].standardWebSocketBuilder.asHttpRequest().uri() + assertTrue(calledUri.toString().contains("/api/v1/namespaces/test/pods/single-container/exec")) + assertTrue(calledUri.toString().contains("command=ls")) } @Test fun testActionFailed() { + val failed = StatusBuilder() + .withStatus("failed") + .withNewDetails() + .endDetails() + .build() + httpClient.wsExpect("/api/v1/namespaces/test/pods/single-container/exec", buildWsFutureProvider(failed)) + val action = Action().apply { execCommand = ExecCommand().apply { selector = ExecActionSelector().apply { @@ -125,11 +104,36 @@ class ActionCommandTest { matchLabels = mapOf("app" to "pod") } } - command = arrayOf("error-command") + command = arrayOf("exit", "1") timeoutSeconds = 10L } } - assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } } + assertThrows<ActionCommandFailedException> { action.exec(this.client) } + + val calledUri = httpClient.recordedBuildWebSocketDirects[0].standardWebSocketBuilder.asHttpRequest().uri() + assertTrue(calledUri.toString().contains("/api/v1/namespaces/test/pods/single-container/exec")) + assertTrue(calledUri.toString().contains("command=exit")) + assertTrue(calledUri.toString().contains("command=1")) + } + + private fun buildBodyBytes(prefix: Byte, body: String): ByteBuffer { + val original = body.toByteArray(StandardCharsets.UTF_8) + return ByteBuffer.allocate(original.size + 1).apply { + put(prefix) + put(original) + } + } + + private fun buildWsFutureProvider(status: Status): TestStandardHttpClient.WsFutureProvider { + val webSocket = mock<WebSocket> { + on { send(ArgumentMatchers.any()) } doReturn(true) + } + return TestStandardHttpClient.WsFutureProvider { _, l -> + l.onOpen(webSocket) + val message = buildBodyBytes(3, Serialization.asJson(status)) + l.onMessage(webSocket, message) // exit + CompletableFuture.completedFuture(WebSocketResponse(WebSocketUpgradeResponse(null), webSocket)) + } } }