Skip to content
Snippets Groups Projects
Commit b5057d49 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Add test for performing actions command and benchmarkStateChecker

parent e05616cb
No related branches found
No related tags found
1 merge request!201Introduce action commands
...@@ -26,6 +26,7 @@ dependencies { ...@@ -26,6 +26,7 @@ dependencies {
implementation('io.fabric8:kubernetes-model-common:5.4.1'){force = true} implementation('io.fabric8:kubernetes-model-common:5.4.1'){force = true}
implementation 'org.apache.kafka:kafka-clients:2.7.0' implementation 'org.apache.kafka:kafka-clients:2.7.0'
implementation 'khttp:khttp:1.0.0' implementation 'khttp:khttp:1.0.0'
implementation 'org.junit.jupiter:junit-jupiter:5.7.0'
compile 'junit:junit:4.12' compile 'junit:junit:4.12'
......
package theodolite.benchmark package theodolite.benchmark
import io.fabric8.kubernetes.api.model.Status
import io.fabric8.kubernetes.client.KubernetesClientException
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.ExecListener import io.fabric8.kubernetes.client.dsl.ExecListener
import io.fabric8.kubernetes.client.dsl.ExecWatch import io.fabric8.kubernetes.client.dsl.ExecWatch
import io.fabric8.kubernetes.client.utils.Serialization
import mu.KotlinLogging import mu.KotlinLogging
import okhttp3.Response import okhttp3.Response
import theodolite.util.ActionCommandFailedException import theodolite.util.ActionCommandFailedException
...@@ -11,13 +14,14 @@ import java.io.ByteArrayOutputStream ...@@ -11,13 +14,14 @@ import java.io.ByteArrayOutputStream
import java.time.Duration import java.time.Duration
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.properties.Delegates
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class ActionCommand(val client: NamespacedKubernetesClient) { class ActionCommand(val client: NamespacedKubernetesClient) {
var out: ByteArrayOutputStream = ByteArrayOutputStream() var out: ByteArrayOutputStream = ByteArrayOutputStream()
var error: ByteArrayOutputStream = ByteArrayOutputStream() var error: ByteArrayOutputStream = ByteArrayOutputStream()
var errChannelStream: ByteArrayOutputStream = ByteArrayOutputStream()
private val execLatch = CountDownLatch(1); private val execLatch = CountDownLatch(1);
/** /**
...@@ -35,8 +39,6 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -35,8 +39,6 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
timeout: Long = Configuration.TIMEOUT_SECONDS, timeout: Long = Configuration.TIMEOUT_SECONDS,
container: String = "" container: String = ""
): Int { ): Int {
val exitCode = ExitCode()
try { try {
val execWatch: ExecWatch = if (container.isNotEmpty()) { val execWatch: ExecWatch = if (container.isNotEmpty()) {
client.pods() client.pods()
...@@ -51,7 +53,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -51,7 +53,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
} }
.writingOutput(out) .writingOutput(out)
.writingError(error) .writingError(error)
.usingListener(MyPodExecListener(execLatch, exitCode)) .writingErrorChannel(errChannelStream)
.usingListener(MyPodExecListener(execLatch))
.exec(*command) .exec(*command)
val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS); val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS);
...@@ -59,12 +62,46 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -59,12 +62,46 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
throw ActionCommandFailedException("Latch could not terminate within specified time") throw ActionCommandFailedException("Latch could not terminate within specified time")
} }
execWatch.close(); execWatch.close();
} catch (e: InterruptedException) { } catch (e: Exception) {
Thread.currentThread().interrupt(); when (e) {
throw ActionCommandFailedException("Interrupted while waiting for the exec", e) is InterruptedException -> {
Thread.currentThread().interrupt();
throw ActionCommandFailedException("Interrupted while waiting for the exec", e)
}
is KubernetesClientException -> {
throw ActionCommandFailedException("Error while executing command", e)
}
else -> {
throw e
}
}
} }
logger.info { "Action command finished with code ${exitCode.code}" } logger.debug { "Execution Output Stream is \n $out" }
return exitCode.code logger.debug { "Execution Error Stream is \n $error" }
return getExitCode(errChannelStream)
}
private fun getExitCode(errChannelStream: ByteArrayOutputStream): Int {
val status: Status?
try {
status = Serialization.unmarshal(errChannelStream.toString(), Status::class.java)
} catch (e: Exception) {
throw ActionCommandFailedException("Could not determine the exit code, no information given")
}
if (status == null) {
throw ActionCommandFailedException("Could not determine the exit code, no information given")
}
return if (status.status.equals("Success")) {
0
} else status.details.causes.stream()
.filter { it.reason.equals("ExitCode") }
.map { it.message }
.findFirst()
.orElseThrow {
ActionCommandFailedException("Status is not SUCCESS but contains no exit code - Status: $status")
}.toInt()
} }
fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String { fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String {
...@@ -98,7 +135,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -98,7 +135,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
} }
} }
private class MyPodExecListener(val execLatch: CountDownLatch, val exitCode: ExitCode) : ExecListener { private class MyPodExecListener(val execLatch: CountDownLatch) : ExecListener {
override fun onOpen(response: Response) { override fun onOpen(response: Response) {
} }
...@@ -108,14 +145,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -108,14 +145,8 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
} }
override fun onClose(code: Int, reason: String) { override fun onClose(code: Int, reason: String) {
exitCode.code = code
exitCode.reason = reason
execLatch.countDown() execLatch.countDown()
} }
} }
private class ExitCode() {
var code by Delegates.notNull<Int>()
lateinit var reason: String
}
} }
...@@ -47,7 +47,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -47,7 +47,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
@Transient @Transient
private val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) var client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace)
/** /**
* Loads [KubernetesResource]s. * Loads [KubernetesResource]s.
......
...@@ -7,6 +7,7 @@ import io.fabric8.kubernetes.client.dsl.Resource ...@@ -7,6 +7,7 @@ import io.fabric8.kubernetes.client.dsl.Resource
import theodolite.benchmark.Action import theodolite.benchmark.Action
import theodolite.benchmark.ActionSelector import theodolite.benchmark.ActionSelector
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.ResourceSets
import theodolite.model.crd.BenchmarkCRD import theodolite.model.crd.BenchmarkCRD
import theodolite.model.crd.BenchmarkStates import theodolite.model.crd.BenchmarkStates
import theodolite.model.crd.KubernetesBenchmarkList import theodolite.model.crd.KubernetesBenchmarkList
...@@ -57,10 +58,10 @@ class BenchmarkStateChecker( ...@@ -57,10 +58,10 @@ class BenchmarkStateChecker(
* @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING] * @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING]
*/ */
private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates { private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates {
return if (checkIfActionPossible(benchmark, benchmark.sut.beforeActions) return if (checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.beforeActions)
&& checkIfActionPossible(benchmark, benchmark.sut.afterActions) && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.afterActions)
&& checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions)
&& checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions) && checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions)
) { ) {
BenchmarkStates.READY BenchmarkStates.READY
} else { } else {
...@@ -77,9 +78,9 @@ class BenchmarkStateChecker( ...@@ -77,9 +78,9 @@ class BenchmarkStateChecker(
* @param actions the actions * @param actions the actions
* @return true if all actions could be executed, else false * @return true if all actions could be executed, else false
*/ */
private fun checkIfActionPossible(benchmark: KubernetesBenchmark, actions: List<Action>): Boolean { private fun checkIfActionPossible(resourcesSets: List<ResourceSets>, actions: List<Action>): Boolean {
return !actions.map { return !actions.map {
checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(benchmark, it.selector) checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(resourcesSets, it.selector)
}.contains(false) }.contains(false)
} }
...@@ -89,7 +90,7 @@ class BenchmarkStateChecker( ...@@ -89,7 +90,7 @@ class BenchmarkStateChecker(
* @param selector the actionSelector to check * @param selector the actionSelector to check
* @return true if the required resources are found, else false * @return true if the required resources are found, else false
*/ */
private fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean { fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean {
val pods = this.client val pods = this.client
.pods() .pods()
.withLabels(selector.pod.matchLabels) .withLabels(selector.pod.matchLabels)
...@@ -103,11 +104,10 @@ class BenchmarkStateChecker( ...@@ -103,11 +104,10 @@ class BenchmarkStateChecker(
.containers .containers
.map { it.name } .map { it.name }
.contains(selector.container) .contains(selector.container)
}.contains(false) }.contains(true)
} else { } else {
pods.isNotEmpty() pods.isNotEmpty()
} }
} }
/** /**
...@@ -117,15 +117,15 @@ class BenchmarkStateChecker( ...@@ -117,15 +117,15 @@ class BenchmarkStateChecker(
* @param selector the actionSelector to check * @param selector the actionSelector to check
* @return true if the required resources are found, else false * @return true if the required resources are found, else false
*/ */
private fun checkIfResourceIsInfrastructure(benchmark: KubernetesBenchmark, selector: ActionSelector): Boolean { fun checkIfResourceIsInfrastructure(resourcesSets: List<ResourceSets>, selector: ActionSelector): Boolean {
val resources = benchmark.loadKubernetesResources(resourceSet = benchmark.infrastructure.resources) val resources = resourcesSets.flatMap { it.loadResourceSet(this.client) }
return if (resources.isEmpty()) { return if (resources.isEmpty()) {
false false
} else { } else {
resources.map { it.second } resources.map { it.second }
.filterIsInstance<Deployment>() .filterIsInstance<Deployment>()
.filter { it.spec.selector.matchLabels.containsMatchLabels(selector.pod.matchLabels) } .filter { it.metadata.labels.containsMatchLabels(selector.pod.matchLabels) }
.any { .any {
if (selector.container.isNotEmpty()) { if (selector.container.isNotEmpty()) {
it.spec.template.spec.containers.map { it.name }.contains(selector.container) it.spec.template.spec.containers.map { it.name }.contains(selector.container)
...@@ -142,7 +142,7 @@ class BenchmarkStateChecker( ...@@ -142,7 +142,7 @@ class BenchmarkStateChecker(
* @param benchmark The benchmark to check * @param benchmark The benchmark to check
* @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING] * @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING]
*/ */
private fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { // TODO check if infrastructure could be loaded or not fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates {
return try { return try {
val appResources = val appResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources) benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
......
package theodolite.benchmark package theodolite.benchmark
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.api.model.PodListBuilder import io.fabric8.kubernetes.api.model.PodListBuilder
import io.fabric8.kubernetes.client.CustomResourceList
import io.fabric8.kubernetes.client.server.mock.KubernetesServer import io.fabric8.kubernetes.client.server.mock.KubernetesServer
import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage
import io.fabric8.kubernetes.client.utils.Utils import io.fabric8.kubernetes.client.utils.Utils
...@@ -13,8 +11,6 @@ import io.quarkus.test.junit.QuarkusTest ...@@ -13,8 +11,6 @@ import io.quarkus.test.junit.QuarkusTest
import org.junit.jupiter.api.* import org.junit.jupiter.api.*
import theodolite.execution.operator.TheodoliteController import theodolite.execution.operator.TheodoliteController
import theodolite.execution.operator.TheodoliteOperator import theodolite.execution.operator.TheodoliteOperator
import theodolite.model.crd.BenchmarkCRD
import theodolite.model.crd.ExecutionCRD
import theodolite.util.ActionCommandFailedException import theodolite.util.ActionCommandFailedException
...@@ -22,14 +18,6 @@ import theodolite.util.ActionCommandFailedException ...@@ -22,14 +18,6 @@ import theodolite.util.ActionCommandFailedException
class ActionCommandTest { class ActionCommandTest {
private val server = KubernetesServer(false, false) private val server = KubernetesServer(false, false)
lateinit var controller: TheodoliteController lateinit var controller: TheodoliteController
private val gson: Gson = GsonBuilder().enableComplexMapKeySerialization().create()
private var benchmark = KubernetesBenchmark()
private var execution = BenchmarkExecution()
private val benchmarkResourceList = CustomResourceList<BenchmarkCRD>()
private val executionResourceList = CustomResourceList<ExecutionCRD>()
@BeforeEach @BeforeEach
fun setUp() { fun setUp() {
...@@ -71,11 +59,11 @@ class ActionCommandTest { ...@@ -71,11 +59,11 @@ class ActionCommandTest {
.expect() .expect()
.withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true") .withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true")
.andUpgradeToWebSocket() .andUpgradeToWebSocket()
.open(OutputStreamMessage("Test ByteStream")) .open(OutputStreamMessage("Test-Output"))
.done() .done()
.always() .always()
} }
/** /**
* Copied from fabric8 Kubernetes Client repository * Copied from fabric8 Kubernetes Client repository
* *
...@@ -106,8 +94,8 @@ class ActionCommandTest { ...@@ -106,8 +94,8 @@ class ActionCommandTest {
@Test @Test
fun testActionCommandExec() { fun testActionCommandExec() {
Assertions.assertEquals(1000, ActionCommand(client = server.client) Assertions.assertEquals(0, ActionCommand(client = server.client)
.exec(mutableMapOf("app" to "pod"), command = arrayOf("ls"), timeout = 30)) .exec(mutableMapOf("app" to "pod"), command = arrayOf("ls"), timeout = 30L))
} }
@Test @Test
...@@ -120,6 +108,7 @@ class ActionCommandTest { ...@@ -120,6 +108,7 @@ class ActionCommandTest {
action.exec.command = arrayOf("ls") action.exec.command = arrayOf("ls")
action.exec.timeoutSeconds = 10L action.exec.timeoutSeconds = 10L
assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } } val e = assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } }
assert(e.message.equals("Could not determine the exit code, no information given"))
} }
} }
\ No newline at end of file
package theodolite.execution.operator
import com.google.gson.Gson
import io.fabric8.kubernetes.api.model.ConfigMapBuilder
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.api.model.PodListBuilder
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
import io.fabric8.kubernetes.client.server.mock.KubernetesServer
import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage
import io.fabric8.kubernetes.client.utils.Utils
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.*
import theodolite.benchmark.*
import theodolite.model.crd.BenchmarkStates
internal class BenchmarkStateCheckerTest {
private val server = KubernetesServer(false, false)
private val serverCrud = KubernetesServer(false, true)
private lateinit var checker: BenchmarkStateChecker
private lateinit var checkerCrud: BenchmarkStateChecker
@BeforeEach
fun setUp() {
server.before()
serverCrud.before()
val operator = TheodoliteOperator()
checker = BenchmarkStateChecker(
client = server.client,
benchmarkCRDClient = operator.getBenchmarkClient(server.client),
benchmarkStateHandler = operator.getBenchmarkStateHandler(server.client)
)
checkerCrud = BenchmarkStateChecker(
client = serverCrud.client,
benchmarkCRDClient = operator.getBenchmarkClient(serverCrud.client),
benchmarkStateHandler = operator.getBenchmarkStateHandler(serverCrud.client)
)
val pod: Pod = PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withLabels<String, String>(mapOf("app" to "pod"))
.withNamespace("test").and()
.build()
val ready: Pod = createReadyFrom(pod, "True")
val podList = PodListBuilder().build()
podList.items.add(0, ready)
server
.expect()
.withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod1")}")
.andReturn(200, podList)
.always()
server
.expect()
.withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod0")}")
.andReturn(200, emptyMap<String, String>())
.always()
server
.expect()
.get()
.withPath("/api/v1/namespaces/test/pods/pod1")
.andReturn(200, ready)
.always()
server
.expect()
.withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true")
.andUpgradeToWebSocket()
.open(OutputStreamMessage("Test-Output"))
.done()
.always()
}
@AfterEach
fun tearDown() {
server.after()
serverCrud.after()
}
/**
* Copied from fabric8 Kubernetes Client repository
*
* @param pod
* @param status
* @return
*/
private fun createReadyFrom(pod: Pod, status: String): Pod {
return PodBuilder(pod)
.withNewStatus()
.addNewCondition()
.withType("Ready")
.withStatus(status)
.endCondition()
.endStatus()
.build()
}
private fun getActionSelector(label: Pair<String, String>): ActionSelector {
val podSelector = PodSelector()
val actionSelector = ActionSelector()
actionSelector.pod = podSelector
// pod with matching labels are deployed
podSelector.matchLabels = mutableMapOf(label)
return actionSelector
}
@Test
fun checkIfResourceIsDeployed() {
// pod with matching labels are deployed
assertTrue(checker.checkIfResourceIsDeployed(getActionSelector("app" to "pod1")))
// no pod with matching labels are deployed
assertFalse(checker.checkIfResourceIsDeployed(getActionSelector("app" to "pod0")))
}
private fun createAndDeployConfigmapResourceSet(): ResourceSets {
// create test deployment
val resourceBuilder = DeploymentBuilder()
resourceBuilder.withNewSpec().endSpec()
resourceBuilder.withNewMetadata().endMetadata()
val resource = resourceBuilder.build()
resource.metadata.name = "test-deployment"
resource.metadata.labels = mutableMapOf("app" to "pod1")
val resourceString = Gson().toJson(resource)
// create and deploy configmap
val configMap1 = ConfigMapBuilder()
.withNewMetadata().withName("test-configmap").endMetadata()
.addToData("test-resource.yaml",resourceString)
.build()
serverCrud.client.configMaps().createOrReplace(configMap1)
// create configmap resource set
val resourceSet = ConfigMapResourceSet()
resourceSet.name = "test-configmap"
// create ResourceSetsList
val set = ResourceSets()
set.configMap = resourceSet
return set
}
@Test
fun checkIfResourceIsInfrastructure() {
val resourceSets = listOf(createAndDeployConfigmapResourceSet())
assertTrue(checkerCrud.checkIfResourceIsInfrastructure(resourceSets, getActionSelector("app" to "pod1")))
assertFalse(checkerCrud.checkIfResourceIsInfrastructure(resourceSets, getActionSelector("app" to "pod0")))
}
@Test
fun checkResources() {
val benchmark = BenchmarkCRDummy(
name = "test-benchmark"
)
benchmark.getCR().spec.client = serverCrud.client
val resourceSet = Resources()
resourceSet.resources = listOf(createAndDeployConfigmapResourceSet())
benchmark.getCR().spec.infrastructure = resourceSet
benchmark.getCR().spec.loadGenerator = resourceSet
benchmark.getCR().spec.sut = resourceSet
assertEquals(BenchmarkStates.READY,checkerCrud.checkResources(benchmark.getCR().spec))
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment