diff --git a/theodolite/build.gradle b/theodolite/build.gradle index 3082deaf12fc48c6aca97ffd00b9c74cd7e6c143..1c7cde32dae62305e2d9df99d95e7991e90068bc 100644 --- a/theodolite/build.gradle +++ b/theodolite/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation('io.fabric8:kubernetes-model-common:5.4.1'){force = true} implementation 'org.apache.kafka:kafka-clients:2.7.0' implementation 'khttp:khttp:1.0.0' + implementation 'org.junit.jupiter:junit-jupiter:5.7.0' compile 'junit:junit:4.12' diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt index c6a3f2726ffd945ddf5ab6038dee76df15781197..4387e66489b26a3c323113dd35d97d43c95567b4 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/ActionCommand.kt @@ -1,8 +1,11 @@ package theodolite.benchmark +import io.fabric8.kubernetes.api.model.Status +import io.fabric8.kubernetes.client.KubernetesClientException import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.dsl.ExecListener import io.fabric8.kubernetes.client.dsl.ExecWatch +import io.fabric8.kubernetes.client.utils.Serialization import mu.KotlinLogging import okhttp3.Response import theodolite.util.ActionCommandFailedException @@ -11,13 +14,14 @@ import java.io.ByteArrayOutputStream import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import kotlin.properties.Delegates + private val logger = KotlinLogging.logger {} class ActionCommand(val client: NamespacedKubernetesClient) { var out: ByteArrayOutputStream = ByteArrayOutputStream() var error: ByteArrayOutputStream = ByteArrayOutputStream() + var errChannelStream: ByteArrayOutputStream = ByteArrayOutputStream() private val execLatch = CountDownLatch(1); /** @@ -35,8 +39,6 @@ class ActionCommand(val client: NamespacedKubernetesClient) { timeout: Long = Configuration.TIMEOUT_SECONDS, container: String = "" ): Int { - val exitCode = ExitCode() - try { val execWatch: ExecWatch = if (container.isNotEmpty()) { client.pods() @@ -51,7 +53,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { } .writingOutput(out) .writingError(error) - .usingListener(MyPodExecListener(execLatch, exitCode)) + .writingErrorChannel(errChannelStream) + .usingListener(MyPodExecListener(execLatch)) .exec(*command) val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS); @@ -59,12 +62,46 @@ class ActionCommand(val client: NamespacedKubernetesClient) { throw ActionCommandFailedException("Latch could not terminate within specified time") } execWatch.close(); - } catch (e: InterruptedException) { - Thread.currentThread().interrupt(); - throw ActionCommandFailedException("Interrupted while waiting for the exec", e) + } catch (e: Exception) { + when (e) { + is InterruptedException -> { + Thread.currentThread().interrupt(); + throw ActionCommandFailedException("Interrupted while waiting for the exec", e) + } + is KubernetesClientException -> { + throw ActionCommandFailedException("Error while executing command", e) + } + else -> { + throw e + } + } } - logger.info { "Action command finished with code ${exitCode.code}" } - return exitCode.code + logger.debug { "Execution Output Stream is \n $out" } + logger.debug { "Execution Error Stream is \n $error" } + return getExitCode(errChannelStream) + } + + private fun getExitCode(errChannelStream: ByteArrayOutputStream): Int { + val status: Status? + try { + status = Serialization.unmarshal(errChannelStream.toString(), Status::class.java) + } catch (e: Exception) { + throw ActionCommandFailedException("Could not determine the exit code, no information given") + } + + if (status == null) { + throw ActionCommandFailedException("Could not determine the exit code, no information given") + } + + return if (status.status.equals("Success")) { + 0 + } else status.details.causes.stream() + .filter { it.reason.equals("ExitCode") } + .map { it.message } + .findFirst() + .orElseThrow { + ActionCommandFailedException("Status is not SUCCESS but contains no exit code - Status: $status") + }.toInt() } fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String { @@ -98,7 +135,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { } } - private class MyPodExecListener(val execLatch: CountDownLatch, val exitCode: ExitCode) : ExecListener { + private class MyPodExecListener(val execLatch: CountDownLatch) : ExecListener { override fun onOpen(response: Response) { } @@ -108,14 +145,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { } override fun onClose(code: Int, reason: String) { - exitCode.code = code - exitCode.reason = reason execLatch.countDown() } } - private class ExitCode() { - var code by Delegates.notNull<Int>() - lateinit var reason: String - } } diff --git a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 99cf12c5680b3eb6a5f9c74c402ceed8974afb3d..2b4d6193539177c9e6260d326f4b57c60a012f8c 100644 --- a/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -47,7 +47,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE @Transient - private val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) + var client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) /** * Loads [KubernetesResource]s. diff --git a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt index 674b62a8007965a0dcc2a5b2ee756a522553c14c..a889b9c7afd8f96a5f508de5c704f2723a250719 100644 --- a/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt +++ b/theodolite/src/main/kotlin/theodolite/execution/operator/BenchmarkStateChecker.kt @@ -7,6 +7,7 @@ import io.fabric8.kubernetes.client.dsl.Resource import theodolite.benchmark.Action import theodolite.benchmark.ActionSelector import theodolite.benchmark.KubernetesBenchmark +import theodolite.benchmark.ResourceSets import theodolite.model.crd.BenchmarkCRD import theodolite.model.crd.BenchmarkStates import theodolite.model.crd.KubernetesBenchmarkList @@ -57,10 +58,10 @@ class BenchmarkStateChecker( * @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING] */ private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates { - return if (checkIfActionPossible(benchmark, benchmark.sut.beforeActions) - && checkIfActionPossible(benchmark, benchmark.sut.afterActions) - && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) - && checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) + return if (checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.beforeActions) + && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.afterActions) + && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions) + && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions) ) { BenchmarkStates.READY } else { @@ -77,9 +78,9 @@ class BenchmarkStateChecker( * @param actions the actions * @return true if all actions could be executed, else false */ - private fun checkIfActionPossible(benchmark: KubernetesBenchmark, actions: List<Action>): Boolean { + private fun checkIfActionPossible(resourcesSets: List<ResourceSets>, actions: List<Action>): Boolean { return !actions.map { - checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(benchmark, it.selector) + checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(resourcesSets, it.selector) }.contains(false) } @@ -89,7 +90,7 @@ class BenchmarkStateChecker( * @param selector the actionSelector to check * @return true if the required resources are found, else false */ - private fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean { + fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean { val pods = this.client .pods() .withLabels(selector.pod.matchLabels) @@ -103,11 +104,10 @@ class BenchmarkStateChecker( .containers .map { it.name } .contains(selector.container) - }.contains(false) + }.contains(true) } else { pods.isNotEmpty() } - } /** @@ -117,15 +117,15 @@ class BenchmarkStateChecker( * @param selector the actionSelector to check * @return true if the required resources are found, else false */ - private fun checkIfResourceIsInfrastructure(benchmark: KubernetesBenchmark, selector: ActionSelector): Boolean { - val resources = benchmark.loadKubernetesResources(resourceSet = benchmark.infrastructure.resources) + fun checkIfResourceIsInfrastructure(resourcesSets: List<ResourceSets>, selector: ActionSelector): Boolean { + val resources = resourcesSets.flatMap { it.loadResourceSet(this.client) } return if (resources.isEmpty()) { false } else { resources.map { it.second } .filterIsInstance<Deployment>() - .filter { it.spec.selector.matchLabels.containsMatchLabels(selector.pod.matchLabels) } + .filter { it.metadata.labels.containsMatchLabels(selector.pod.matchLabels) } .any { if (selector.container.isNotEmpty()) { it.spec.template.spec.containers.map { it.name }.contains(selector.container) @@ -142,7 +142,7 @@ class BenchmarkStateChecker( * @param benchmark The benchmark to check * @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING] */ - private fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { // TODO check if infrastructure could be loaded or not + fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { return try { val appResources = benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) diff --git a/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt b/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt index 3ec361e4190233235a9ed2b7c7dfb6ca3ecf8eff..06bdd90b3b3c82e1bcbe59a1f3e9e35e2f338486 100644 --- a/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt +++ b/theodolite/src/test/kotlin/theodolite/benchmark/ActionCommandTest.kt @@ -1,11 +1,9 @@ package theodolite.benchmark -import com.google.gson.Gson -import com.google.gson.GsonBuilder + import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.api.model.PodListBuilder -import io.fabric8.kubernetes.client.CustomResourceList import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage import io.fabric8.kubernetes.client.utils.Utils @@ -13,8 +11,6 @@ import io.quarkus.test.junit.QuarkusTest import org.junit.jupiter.api.* import theodolite.execution.operator.TheodoliteController import theodolite.execution.operator.TheodoliteOperator -import theodolite.model.crd.BenchmarkCRD -import theodolite.model.crd.ExecutionCRD import theodolite.util.ActionCommandFailedException @@ -22,14 +18,6 @@ import theodolite.util.ActionCommandFailedException class ActionCommandTest { private val server = KubernetesServer(false, false) lateinit var controller: TheodoliteController - private val gson: Gson = GsonBuilder().enableComplexMapKeySerialization().create() - - private var benchmark = KubernetesBenchmark() - private var execution = BenchmarkExecution() - - private val benchmarkResourceList = CustomResourceList<BenchmarkCRD>() - private val executionResourceList = CustomResourceList<ExecutionCRD>() - @BeforeEach fun setUp() { @@ -71,11 +59,11 @@ class ActionCommandTest { .expect() .withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true") .andUpgradeToWebSocket() - .open(OutputStreamMessage("Test ByteStream")) + .open(OutputStreamMessage("Test-Output")) .done() .always() } - + /** * Copied from fabric8 Kubernetes Client repository * @@ -106,8 +94,8 @@ class ActionCommandTest { @Test fun testActionCommandExec() { - Assertions.assertEquals(1000, ActionCommand(client = server.client) - .exec(mutableMapOf("app" to "pod"), command = arrayOf("ls"), timeout = 30)) + Assertions.assertEquals(0, ActionCommand(client = server.client) + .exec(mutableMapOf("app" to "pod"), command = arrayOf("ls"), timeout = 30L)) } @Test @@ -120,6 +108,7 @@ class ActionCommandTest { action.exec.command = arrayOf("ls") action.exec.timeoutSeconds = 10L - assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } } + val e = assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } } + assert(e.message.equals("Could not determine the exit code, no information given")) } -} \ No newline at end of file +} diff --git a/theodolite/src/test/kotlin/theodolite/execution/operator/BenchmarkStateCheckerTest.kt b/theodolite/src/test/kotlin/theodolite/execution/operator/BenchmarkStateCheckerTest.kt new file mode 100644 index 0000000000000000000000000000000000000000..d160a0e4a0a5d8a1d4fa4636909de5ff38b5e836 --- /dev/null +++ b/theodolite/src/test/kotlin/theodolite/execution/operator/BenchmarkStateCheckerTest.kt @@ -0,0 +1,178 @@ +package theodolite.execution.operator + +import com.google.gson.Gson +import io.fabric8.kubernetes.api.model.ConfigMapBuilder +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.api.model.PodListBuilder +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder +import io.fabric8.kubernetes.client.server.mock.KubernetesServer +import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage +import io.fabric8.kubernetes.client.utils.Utils +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.* +import theodolite.benchmark.* +import theodolite.model.crd.BenchmarkStates + +internal class BenchmarkStateCheckerTest { + private val server = KubernetesServer(false, false) + private val serverCrud = KubernetesServer(false, true) + private lateinit var checker: BenchmarkStateChecker + private lateinit var checkerCrud: BenchmarkStateChecker + + @BeforeEach + fun setUp() { + server.before() + serverCrud.before() + val operator = TheodoliteOperator() + checker = BenchmarkStateChecker( + client = server.client, + benchmarkCRDClient = operator.getBenchmarkClient(server.client), + benchmarkStateHandler = operator.getBenchmarkStateHandler(server.client) + ) + + checkerCrud = BenchmarkStateChecker( + client = serverCrud.client, + benchmarkCRDClient = operator.getBenchmarkClient(serverCrud.client), + benchmarkStateHandler = operator.getBenchmarkStateHandler(serverCrud.client) + ) + + val pod: Pod = PodBuilder().withNewMetadata() + .withName("pod1") + .withResourceVersion("1") + .withLabels<String, String>(mapOf("app" to "pod")) + .withNamespace("test").and() + .build() + + val ready: Pod = createReadyFrom(pod, "True") + + val podList = PodListBuilder().build() + podList.items.add(0, ready) + + + server + .expect() + .withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod1")}") + .andReturn(200, podList) + .always() + + server + .expect() + .withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod0")}") + .andReturn(200, emptyMap<String, String>()) + .always() + + + server + .expect() + .get() + .withPath("/api/v1/namespaces/test/pods/pod1") + .andReturn(200, ready) + .always() + + server + .expect() + .withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true") + .andUpgradeToWebSocket() + .open(OutputStreamMessage("Test-Output")) + .done() + .always() + } + + @AfterEach + fun tearDown() { + server.after() + serverCrud.after() + } + + /** + * Copied from fabric8 Kubernetes Client repository + * + * @param pod + * @param status + * @return + */ + private fun createReadyFrom(pod: Pod, status: String): Pod { + return PodBuilder(pod) + .withNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus(status) + .endCondition() + .endStatus() + .build() + } + + private fun getActionSelector(label: Pair<String, String>): ActionSelector { + val podSelector = PodSelector() + val actionSelector = ActionSelector() + actionSelector.pod = podSelector + + // pod with matching labels are deployed + podSelector.matchLabels = mutableMapOf(label) + return actionSelector + } + + @Test + fun checkIfResourceIsDeployed() { + // pod with matching labels are deployed + assertTrue(checker.checkIfResourceIsDeployed(getActionSelector("app" to "pod1"))) + + // no pod with matching labels are deployed + assertFalse(checker.checkIfResourceIsDeployed(getActionSelector("app" to "pod0"))) + } + + + private fun createAndDeployConfigmapResourceSet(): ResourceSets { + // create test deployment + val resourceBuilder = DeploymentBuilder() + resourceBuilder.withNewSpec().endSpec() + resourceBuilder.withNewMetadata().endMetadata() + val resource = resourceBuilder.build() + resource.metadata.name = "test-deployment" + resource.metadata.labels = mutableMapOf("app" to "pod1") + val resourceString = Gson().toJson(resource) + + // create and deploy configmap + val configMap1 = ConfigMapBuilder() + .withNewMetadata().withName("test-configmap").endMetadata() + .addToData("test-resource.yaml",resourceString) + .build() + + serverCrud.client.configMaps().createOrReplace(configMap1) + + // create configmap resource set + val resourceSet = ConfigMapResourceSet() + resourceSet.name = "test-configmap" + + // create ResourceSetsList + val set = ResourceSets() + set.configMap = resourceSet + return set + } + + @Test + fun checkIfResourceIsInfrastructure() { + val resourceSets = listOf(createAndDeployConfigmapResourceSet()) + assertTrue(checkerCrud.checkIfResourceIsInfrastructure(resourceSets, getActionSelector("app" to "pod1"))) + assertFalse(checkerCrud.checkIfResourceIsInfrastructure(resourceSets, getActionSelector("app" to "pod0"))) + + } + + @Test + fun checkResources() { + val benchmark = BenchmarkCRDummy( + name = "test-benchmark" + ) + benchmark.getCR().spec.client = serverCrud.client + val resourceSet = Resources() + resourceSet.resources = listOf(createAndDeployConfigmapResourceSet()) + benchmark.getCR().spec.infrastructure = resourceSet + benchmark.getCR().spec.loadGenerator = resourceSet + benchmark.getCR().spec.sut = resourceSet + + assertEquals(BenchmarkStates.READY,checkerCrud.checkResources(benchmark.getCR().spec)) + } +} \ No newline at end of file