Skip to content
Snippets Groups Projects
Commit 66a0650a authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Introduces a BechnmarkStateChecker which checks in each iteration of the...

Introduces a BechnmarkStateChecker which checks in each iteration of the controller whether all required resouce can be loaded and if all specified actions could be executed (all required resource are available). Small fixes.
parent 5cba587c
No related branches found
No related tags found
1 merge request!201Introduce action commands
...@@ -3,6 +3,12 @@ kind: benchmark ...@@ -3,6 +3,12 @@ kind: benchmark
metadata: metadata:
name: uc1-kstreams name: uc1-kstreams
spec: spec:
infrastructure:
resources:
- configMap:
name: "example-configmap"
files:
- "uc1-kstreams-deployment.yaml"
sut: sut:
resources: resources:
- configMap: - configMap:
...@@ -13,10 +19,9 @@ spec: ...@@ -13,10 +19,9 @@ spec:
- selector: - selector:
pod: pod:
matchLabels: matchLabels:
app: kafka-client app: titan-ccp-aggregation
container: "abc" # optional (if there is only one container or there exists something like a default container)
exec: exec:
command: "kafka-topics.sh --create --topic abc" # or list command: "ls -l" # or list
loadGenerator: loadGenerator:
resources: resources:
- configMap: - configMap:
...@@ -28,8 +33,9 @@ spec: ...@@ -28,8 +33,9 @@ spec:
- selector: - selector:
pod: pod:
matchLabels: matchLabels:
app: kafka-client 'app.kubernetes.io/name': grafana
container: "abc" # optional (if there is only one container or there exists something like a default container)
#container: "abc" # optional (if there is only one container or there exists something like a default container)
exec: exec:
command: "kafka-topics.sh --create --topic abc" # or list command: "kafka-topics.sh --create --topic abc" # or list
resourceTypes: resourceTypes:
......
...@@ -27,7 +27,7 @@ class Action { ...@@ -27,7 +27,7 @@ class Action {
@RegisterForReflection @RegisterForReflection
class ActionSelector { class ActionSelector {
lateinit var pod: PodSelector lateinit var pod: PodSelector
lateinit var container: String var container: String = ""
} }
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
......
...@@ -18,10 +18,22 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -18,10 +18,22 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
private val execLatch = CountDownLatch(1); private val execLatch = CountDownLatch(1);
fun exec(matchLabels: MutableMap<String, String>, container: String, command: String): Pair<String, String> { fun exec(matchLabels: MutableMap<String, String>, container: String, command: String): Pair<String, String> {
println("container is: $container")
try { try {
val execWatch: ExecWatch = client.pods()
val execWatch: ExecWatch = if (container.isNotEmpty()) {
println("container is not empty")
client.pods()
.inNamespace(client.namespace)
.withName(getPodName(matchLabels)) .withName(getPodName(matchLabels))
.inContainer(container) .inContainer(container)
} else {
println("container is empty")
client.pods()
.inNamespace(client.namespace)
.withName(getPodName(matchLabels))
}
.writingOutput(out) .writingOutput(out)
.writingError(error) .writingError(error)
.usingListener(MyPodExecListener(execLatch)) .usingListener(MyPodExecListener(execLatch))
......
...@@ -59,7 +59,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -59,7 +59,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
} }
override fun setupInfrastructure() { override fun setupInfrastructure() {
this.infrastructure.beforeActions.forEach { it.exec } this.infrastructure.beforeActions.forEach { it.exec(client = client) }
val kubernetesManager = K8sManager(this.client) val kubernetesManager = K8sManager(this.client)
loadKubernetesResources(this.infrastructure.resources) loadKubernetesResources(this.infrastructure.resources)
.map{it.second} .map{it.second}
...@@ -67,7 +67,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -67,7 +67,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
} }
override fun teardownInfrastructure() { override fun teardownInfrastructure() {
this.infrastructure.afterActions.forEach { it.exec } this.infrastructure.afterActions.forEach { it.exec(client = client) }
val kubernetesManager = K8sManager(this.client) val kubernetesManager = K8sManager(this.client)
loadKubernetesResources(this.infrastructure.resources) loadKubernetesResources(this.infrastructure.resources)
.map{it.second} .map{it.second}
......
...@@ -48,8 +48,8 @@ class KubernetesBenchmarkDeployment( ...@@ -48,8 +48,8 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources. * - Deploy the needed resources.
*/ */
override fun setup() { override fun setup() {
sutBeforeActions.forEach { it.exec } sutBeforeActions.forEach { it.exec(client = client) }
loadGenBeforeActions.forEach { it.exec } loadGenBeforeActions.forEach { it.exec(client = client) }
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
...@@ -74,8 +74,8 @@ class KubernetesBenchmarkDeployment( ...@@ -74,8 +74,8 @@ class KubernetesBenchmarkDeployment(
labelName = LAG_EXPORTER_POD_LABEL_NAME, labelName = LAG_EXPORTER_POD_LABEL_NAME,
labelValue = LAG_EXPORTER_POD_LABEL_VALUE labelValue = LAG_EXPORTER_POD_LABEL_VALUE
) )
sutAfterActions.forEach { it.exec } sutAfterActions.forEach { it.exec(client = client) }
loadGenAfterActions.forEach { it.exec } loadGenAfterActions.forEach { it.exec(client = client) }
logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." } logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." }
Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis()) Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis())
} }
......
package theodolite.execution.operator
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import theodolite.benchmark.Action
import theodolite.benchmark.ActionSelector
import theodolite.benchmark.KubernetesBenchmark
import theodolite.model.crd.BenchmarkCRD
import theodolite.model.crd.BenchmarkStates
import theodolite.model.crd.KubernetesBenchmarkList
class BenchmarkStateChecker(
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
private val benchmarkStateHandler: BenchmarkStateHandler,
private val client: NamespacedKubernetesClient
) {
/**
* Checks and updates the states off all deployed benchmarks.
*
*/
fun updateBenchmarkStatus() {
this.benchmarkCRDClient
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { Pair(it, checkState(it.spec)) }
.forEach { setState(it.first, it.second) }
}
private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) {
benchmarkStateHandler.setResourceSetState(resource.spec.name, state)
}
/**
* Checks the state of the benchmark.
*
* @param benchmark The benchmark to check
* @return [BenchmarkStates.READY] iff all resource could be loaded and all actions could be executed, [BenchmarkStates.PENDING] else
*/
private fun checkState(benchmark: KubernetesBenchmark): BenchmarkStates {
return if (checkActionCommands(benchmark) == BenchmarkStates.READY
&& checkResources(benchmark) == BenchmarkStates.READY
) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
}
/**
* Checks if all specified actions of the given benchmark could be executed or not
*
* @param benchmark The benchmark to check
* @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING]
*/
private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates {
return if (checkIfActionPossible(benchmark, benchmark.sut.beforeActions)
&& checkIfActionPossible(benchmark, benchmark.sut.afterActions)
&& checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions)
&& checkIfActionPossible(benchmark, benchmark.loadGenerator.beforeActions)
) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
}
/**
* Action commands are called on a pod. To verify that an action command can be executed,
* it checks that the specified pods are either currently running in the cluster or
* have been specified as infrastructure in the benchmark.
*
* @param benchmark the benchmark to check
* @param actions the actions
* @return true if all actions could be executed, else false
*/
private fun checkIfActionPossible(benchmark: KubernetesBenchmark, actions: List<Action>): Boolean {
return !actions.map {
checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(benchmark, it.selector)
}.contains(false)
}
/**
* Checks for the given actionSelector whether the required resources are already deployed in the cluster or not
*
* @param selector the actionSelector to check
* @return true if the required resources are found, else false
*/
private fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean {
val pods = this.client
.pods()
.withLabels(selector.pod.matchLabels)
.list()
.items
return if (pods.isNotEmpty() && selector.container.isNotEmpty()) {
pods.map { pod ->
pod
.spec
.containers
.map { it.name }
.contains(selector.container)
}.contains(false)
} else {
pods.isNotEmpty()
}
}
/**
* Checks for the given actionSelector whether the required resources are specified as infrastructure or not
*
* @param benchmark the benchmark to check
* @param selector the actionSelector to check
* @return true if the required resources are found, else false
*/
private fun checkIfResourceIsInfrastructure(benchmark: KubernetesBenchmark, selector: ActionSelector): Boolean {
val resources = benchmark.loadKubernetesResources(resourceSet = benchmark.infrastructure.resources)
return if (resources.isEmpty()) {
false
} else {
resources.map { it.second }
.filterIsInstance<Deployment>()
.filter { it.spec.selector.matchLabels.containsMatchLabels(selector.pod.matchLabels) }
.any {
if (selector.container.isNotEmpty()) {
it.spec.template.spec.containers.map { it.name }.contains(selector.container)
} else {
true
}
}
}
}
/**
* Checks if it is possible to load all specified Kubernetes manifests.
*
* @param benchmark The benchmark to check
* @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING]
*/
private fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates { // TODO check if infrastructure could be loaded or not
return try {
val appResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
val loadGenResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.loadGenerator.resources)
if (appResources.isNotEmpty() && loadGenResources.isNotEmpty()) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
} catch (e: Exception) {
BenchmarkStates.PENDING
}
}
}
private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>) : Boolean {
for (kv in matchLabels) {
if (kv.value != this[kv.key as K]) {
return false
}
}
return true
}
...@@ -29,7 +29,7 @@ class TheodoliteController( ...@@ -29,7 +29,7 @@ class TheodoliteController(
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
private val executionStateHandler: ExecutionStateHandler, private val executionStateHandler: ExecutionStateHandler,
private val benchmarkStateHandler: BenchmarkStateHandler private val benchmarkSateChecker: BenchmarkStateChecker
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
...@@ -41,7 +41,7 @@ class TheodoliteController( ...@@ -41,7 +41,7 @@ class TheodoliteController(
sleep(5000) // wait until all states are correctly set sleep(5000) // wait until all states are correctly set
while (true) { while (true) {
reconcile() reconcile()
updateBenchmarkStatus() benchmarkSateChecker.updateBenchmarkStatus()
sleep(2000) sleep(2000)
} }
} }
...@@ -49,7 +49,7 @@ class TheodoliteController( ...@@ -49,7 +49,7 @@ class TheodoliteController(
private fun reconcile() { private fun reconcile() {
do { do {
val execution = getNextExecution() val execution = getNextExecution()
updateBenchmarkStatus() benchmarkSateChecker.updateBenchmarkStatus()
if (execution != null) { if (execution != null) {
val benchmark = getBenchmarks() val benchmark = getBenchmarks()
.map { it.spec } .map { it.spec }
...@@ -139,7 +139,6 @@ class TheodoliteController( ...@@ -139,7 +139,6 @@ class TheodoliteController(
} }
} }
/** /**
* Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution] * Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution]
* is selected for the next execution depends on three points: * is selected for the next execution depends on three points:
...@@ -173,34 +172,7 @@ class TheodoliteController( ...@@ -173,34 +172,7 @@ class TheodoliteController(
.firstOrNull() .firstOrNull()
} }
private fun updateBenchmarkStatus() {
this.benchmarkCRDClient
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { Pair(it, checkResource(it.spec)) }
.forEach { setState(it.first, it.second ) }
}
private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) {
benchmarkStateHandler.setResourceSetState(resource.spec.name, state)
}
private fun checkResource(benchmark: KubernetesBenchmark): BenchmarkStates {
return try {
val appResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
val loadGenResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
if(appResources.isNotEmpty() && loadGenResources.isNotEmpty()) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
} catch (e: Exception) {
BenchmarkStates.PENDING
}
}
fun isExecutionRunning(executionName: String): Boolean { fun isExecutionRunning(executionName: String): Boolean {
if (!::executor.isInitialized) return false if (!::executor.isInitialized) return false
......
...@@ -34,6 +34,7 @@ class TheodoliteOperator { ...@@ -34,6 +34,7 @@ class TheodoliteOperator {
private lateinit var controller: TheodoliteController private lateinit var controller: TheodoliteController
private lateinit var executionStateHandler: ExecutionStateHandler private lateinit var executionStateHandler: ExecutionStateHandler
private lateinit var benchmarkStateHandler: BenchmarkStateHandler private lateinit var benchmarkStateHandler: BenchmarkStateHandler
private lateinit var benchmarkStateChecker: BenchmarkStateChecker
fun start() { fun start() {
...@@ -71,7 +72,7 @@ class TheodoliteOperator { ...@@ -71,7 +72,7 @@ class TheodoliteOperator {
controller = getController( controller = getController(
client = client, client = client,
executionStateHandler = getExecutionStateHandler(client = client), executionStateHandler = getExecutionStateHandler(client = client),
benchmarkStateHandler = getBenchmarkStateHandler(client = client) benchmarkStateChecker = getBenchmarkStateChecker(client = client)
) )
getExecutionEventHandler(controller, client).startAllRegisteredInformers() getExecutionEventHandler(controller, client).startAllRegisteredInformers()
...@@ -112,17 +113,28 @@ class TheodoliteOperator { ...@@ -112,17 +113,28 @@ class TheodoliteOperator {
return benchmarkStateHandler return benchmarkStateHandler
} }
fun getBenchmarkStateChecker(client: NamespacedKubernetesClient) : BenchmarkStateChecker {
if (!::benchmarkStateChecker.isInitialized) {
this.benchmarkStateChecker = BenchmarkStateChecker(
client = client,
benchmarkStateHandler = getBenchmarkStateHandler(client = client),
benchmarkCRDClient = getBenchmarkClient(client = client))
}
return benchmarkStateChecker
}
fun getController( fun getController(
client: NamespacedKubernetesClient, client: NamespacedKubernetesClient,
executionStateHandler: ExecutionStateHandler, executionStateHandler: ExecutionStateHandler,
benchmarkStateHandler: BenchmarkStateHandler benchmarkStateChecker: BenchmarkStateChecker
): TheodoliteController { ): TheodoliteController {
if (!::controller.isInitialized) { if (!::controller.isInitialized) {
this.controller = TheodoliteController( this.controller = TheodoliteController(
benchmarkCRDClient = getBenchmarkClient(client), benchmarkCRDClient = getBenchmarkClient(client),
executionCRDClient = getExecutionClient(client), executionCRDClient = getExecutionClient(client),
executionStateHandler = executionStateHandler, executionStateHandler = executionStateHandler,
benchmarkStateHandler = benchmarkStateHandler benchmarkSateChecker = benchmarkStateChecker
) )
} }
return this.controller return this.controller
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment