Skip to content
Snippets Groups Projects
Commit 7f99aa09 authored by Sören Henning's avatar Sören Henning
Browse files

Merge commit '1a248290' into quarkus-upgrade

parents b66c3b09 1a248290
No related branches found
No related tags found
1 merge request!213Upgrade Quarkus
Pipeline #6008 failed
Showing
with 1768 additions and 65 deletions
...@@ -4,7 +4,6 @@ description: >- ...@@ -4,7 +4,6 @@ description: >-
scalability of cloud-native applications. scalability of cloud-native applications.
remote_theme: pmarsceill/just-the-docs remote_theme: pmarsceill/just-the-docs
#color_scheme: "dark"
aux_links: aux_links:
"Theodolite on GitHub": "Theodolite on GitHub":
- "//github.com/cau-se/theodolite" - "//github.com/cau-se/theodolite"
...@@ -14,4 +13,5 @@ exclude: ...@@ -14,4 +13,5 @@ exclude:
- Gemfile - Gemfile
- Gemfile.lock - Gemfile.lock
- README.md - README.md
- vendor - vendor/
- drafts/
This diff is collapsed.
## Infrastructure
The necessary infrastructure for an execution can be defined in the benchmark manifests. The related resources are create *before* an execution is started, and removed *after* an execution is finished.
### Example
```yaml
infrastructure:
resources:
- configMap:
name: "example-configmap"
files:
- "uc1-kstreams-deployment.yaml"
```
## Action Commands
Theodolite allows to execute commands on running pods (similar to the `kubectl exec -it <pod-name> -- <command>` command). This commands can be run either before (via so called `beforeActions`) or after (via so called `afterActions`) an experiment is executed.
Theodolite checks if all required pods are available for the specified actions (i.e. the pods must either be defined as infrastructure or already deployed in the cluster). If not all pods/resources are available, the benchmark will not be set as `Ready`. Consequently, an action cannot be executed on a pod that is defined as an SUT or loadGen resource.
### Example
```yaml
# For the system under test
sut:
resources: ...
beforeActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: ["touch", "test-file-sut"]
timeoutSeconds: 90
afterActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: [ "touch", "test-file-sut-after" ]
timeoutSeconds: 90
# analog, for the load generator
loadGenerator:
resources: ...
beforeActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: ["touch", "test-file-loadGen"]
timeoutSeconds: 90
afterActions:
- selector:
pod:
matchLabels:
app: busybox1
exec:
command: [ "touch", "test-file-loadGen-after" ]
timeoutSeconds: 90
```
\ No newline at end of file
buildscript {
repositories {
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
classpath "com.github.jengelman.gradle.plugins:shadow:6.0.0"
}
}
// to discover the precompiled script plugins // to discover the precompiled script plugins
plugins { plugins {
id 'groovy-gradle-plugin' id 'groovy-gradle-plugin'
...@@ -19,6 +8,6 @@ repositories { ...@@ -19,6 +8,6 @@ repositories {
} }
dependencies { dependencies {
implementation 'gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.0' implementation 'com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.4'
implementation 'com.github.jengelman.gradle.plugins:shadow:6.0.0' implementation 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2'
} }
...@@ -66,5 +66,5 @@ spotbugs { ...@@ -66,5 +66,5 @@ spotbugs {
reportLevel = "low" reportLevel = "low"
effort = "max" effort = "max"
ignoreFailures = false ignoreFailures = false
toolVersion = '4.1.4' toolVersion = '4.5.3'
} }
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
...@@ -64,6 +64,80 @@ spec: ...@@ -64,6 +64,80 @@ spec:
type: array type: array
items: items:
type: string type: string
beforeActions:
type: array
default: []
description: Infrastructure before actions are executed before the infrastructure is set up.
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
afterActions:
type: array
default: []
description: Infrastructure after actions are executed after the teardown of the infrastructure.
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
sut: sut:
description: The appResourceSets specifies all Kubernetes resources required to start the sut. A resourceSet can be either a configMap resourceSet or a fileSystem resourceSet. description: The appResourceSets specifies all Kubernetes resources required to start the sut. A resourceSet can be either a configMap resourceSet or a fileSystem resourceSet.
type: object type: object
...@@ -101,6 +175,79 @@ spec: ...@@ -101,6 +175,79 @@ spec:
type: array type: array
items: items:
type: string type: string
beforeActions:
type: array
default: []
description: SUT before actions are executed before the SUT is started.
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
afterActions:
type: array
default: []
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
loadGenerator: loadGenerator:
description: The loadGenResourceSets specifies all Kubernetes resources required to start the load generator. A resourceSet can be either a configMap resourceSet or a fileSystem resourceSet. description: The loadGenResourceSets specifies all Kubernetes resources required to start the load generator. A resourceSet can be either a configMap resourceSet or a fileSystem resourceSet.
type: object type: object
...@@ -138,6 +285,80 @@ spec: ...@@ -138,6 +285,80 @@ spec:
type: array type: array
items: items:
type: string type: string
beforeActions:
type: array
default: [ ]
description: Load generator before actions are executed before the load generator is started.
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
afterActions:
type: array
default: []
description: Load generator after actions are executed after the teardown of the load generator.
items:
type: object
properties:
selector:
type: object
description: The selector specifies which resource should be selected for the execution of the command.
properties:
pod:
type: object
description: Specifies the pod.
properties:
matchLabels:
type: object
description: The matchLabels of the desired pod.
additionalProperties: true
x-kubernetes-map-type: "granular"
default: { }
container:
description: Specifies the container.
default: ""
type: string
exec:
type: object
description: Specifies command to be executed.
properties:
command:
type: array
description: The command to be executed as string array.
items:
type: string
timeoutSeconds:
description: Specifies the timeout (in seconds) for the specified command.
type: integer
resourceTypes: resourceTypes:
description: A list of resource types that can be scaled for this `benchmark` resource. For each resource type the concrete values are defined in the `execution` object. description: A list of resource types that can be scaled for this `benchmark` resource. For each resource type the concrete values are defined in the `execution` object.
type: array type: array
......
package theodolite.benchmark
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ActionCommandFailedException
import theodolite.util.Configuration
@JsonDeserialize
@RegisterForReflection
@JsonInclude(JsonInclude.Include.NON_NULL)
class Action {
lateinit var selector: ActionSelector
lateinit var exec: Command
fun exec(client: NamespacedKubernetesClient) {
val exitCode = ActionCommand(client = client)
.exec(
matchLabels = selector.pod.matchLabels,
container = selector.container,
timeout = exec.timeoutSeconds,
command = exec.command
)
if(exitCode != 0){
throw ActionCommandFailedException("Error while executing action, finished with exit code $exitCode")
}
}
}
@JsonDeserialize
@RegisterForReflection
class ActionSelector {
lateinit var pod: PodSelector
var container: String = ""
}
@JsonDeserialize
@RegisterForReflection
class PodSelector {
lateinit var matchLabels: MutableMap<String, String>
}
@JsonDeserialize
@RegisterForReflection
class Command {
lateinit var command: Array<String>
var timeoutSeconds: Long = Configuration.TIMEOUT_SECONDS
}
\ No newline at end of file
package theodolite.benchmark
import io.fabric8.kubernetes.api.model.Status
import io.fabric8.kubernetes.client.KubernetesClientException
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.ExecListener
import io.fabric8.kubernetes.client.dsl.ExecWatch
import io.fabric8.kubernetes.client.utils.Serialization
import mu.KotlinLogging
import okhttp3.Response
import theodolite.util.ActionCommandFailedException
import theodolite.util.Configuration
import java.io.ByteArrayOutputStream
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
private val logger = KotlinLogging.logger {}
class ActionCommand(val client: NamespacedKubernetesClient) {
var out: ByteArrayOutputStream = ByteArrayOutputStream()
var error: ByteArrayOutputStream = ByteArrayOutputStream()
var errChannelStream: ByteArrayOutputStream = ByteArrayOutputStream()
private val execLatch = CountDownLatch(1)
/**
* Executes an action command.
*
* @param matchLabels matchLabels specifies on which pod the command should be executed. For this, the principle
* `of any` of is used and the command is called on one of the possible pods.
* @param container (Optional) The container to run the command. Is optional iff exactly one container exist.
* @param command The command to be executed.
* @return the exit code of this executed command
*/
fun exec(
matchLabels: MutableMap<String, String>,
command: Array<String>,
timeout: Long = Configuration.TIMEOUT_SECONDS,
container: String = ""
): Int {
try {
val execWatch: ExecWatch = if (container.isNotEmpty()) {
client.pods()
.inNamespace(client.namespace)
.withName(getPodName(matchLabels, 3))
.inContainer(container)
} else {
client.pods()
.inNamespace(client.namespace)
.withName(getPodName(matchLabels, 3))
}
.writingOutput(out)
.writingError(error)
.writingErrorChannel(errChannelStream)
.usingListener(ActionCommandListener(execLatch))
.exec(*command)
val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS)
if (!latchTerminationStatus) {
throw ActionCommandFailedException("Latch could not terminate within specified time")
}
execWatch.close()
} catch (e: Exception) {
when (e) {
is InterruptedException -> {
Thread.currentThread().interrupt()
throw ActionCommandFailedException("Interrupted while waiting for the exec", e)
}
is KubernetesClientException -> {
throw ActionCommandFailedException("Error while executing command", e)
}
else -> {
throw e
}
}
}
logger.debug { "Execution Output Stream is \n $out" }
logger.debug { "Execution Error Stream is \n $error" }
logger.debug { "Execution ErrorChannel is: \n $errChannelStream" }
return getExitCode(errChannelStream)
}
private fun getExitCode(errChannelStream: ByteArrayOutputStream): Int {
val status: Status?
try {
status = Serialization.unmarshal(errChannelStream.toString(), Status::class.java)
} catch (e: Exception) {
throw ActionCommandFailedException("Could not determine the exit code, no information given")
}
if (status == null) {
throw ActionCommandFailedException("Could not determine the exit code, no information given")
}
return if (status.status.equals("Success")) {
0
} else status.details.causes.stream()
.filter { it.reason.equals("ExitCode") }
.map { it.message }
.findFirst()
.orElseThrow {
ActionCommandFailedException("Status is not SUCCESS but contains no exit code - Status: $status")
}.toInt()
}
/**
* Find pod with matching labels. The matching pod must have the status `Running`.
*
* @param matchLabels the match labels
* @param tries specifies the number of times to look for a matching pod. When pods are newly created,
* it can take a while until the status is ready and the pod can be selected.
* @return the name of the pod or throws [ActionCommandFailedException]
*/
fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String {
for (i in 1..tries) {
try {
return getPodName(matchLabels)
} catch (e: Exception) {
logger.warn { "Could not found any pod with specified matchlabels or pod is not ready." }
}
Thread.sleep(Duration.ofSeconds(5).toMillis())
}
throw ActionCommandFailedException("Couldn't find any pod that matches the specified labels.")
}
private fun getPodName(matchLabels: MutableMap<String, String>): String {
return try {
val podNames = this.client
.pods()
.withLabels(matchLabels)
.list()
.items
.map { it.metadata.name }
podNames.first {
this.client.pods().withName(it).isReady
}
} catch (e: NoSuchElementException) {
throw ActionCommandFailedException("Couldn't find any pod that matches the specified labels.", e)
}
}
private class ActionCommandListener(val execLatch: CountDownLatch) : ExecListener {
override fun onOpen(response: Response) {
}
override fun onFailure(throwable: Throwable, response: Response) {
execLatch.countDown()
throw ActionCommandFailedException("Some error encountered while executing action, caused ${throwable.message})")
}
override fun onClose(code: Int, reason: String) {
execLatch.countDown()
}
}
}
...@@ -46,7 +46,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -46,7 +46,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
private var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE private var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
@Transient @Transient
private val client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace) private var client: NamespacedKubernetesClient = DefaultKubernetesClient().inNamespace(namespace)
/** /**
* Loads [KubernetesResource]s. * Loads [KubernetesResource]s.
...@@ -58,6 +58,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -58,6 +58,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
} }
override fun setupInfrastructure() { override fun setupInfrastructure() {
this.infrastructure.beforeActions.forEach { it.exec(client = client) }
val kubernetesManager = K8sManager(this.client) val kubernetesManager = K8sManager(this.client)
loadKubernetesResources(this.infrastructure.resources) loadKubernetesResources(this.infrastructure.resources)
.map{it.second} .map{it.second}
...@@ -69,6 +70,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -69,6 +70,7 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
loadKubernetesResources(this.infrastructure.resources) loadKubernetesResources(this.infrastructure.resources)
.map{it.second} .map{it.second}
.forEach { kubernetesManager.remove(it) } .forEach { kubernetesManager.remove(it) }
this.infrastructure.afterActions.forEach { it.exec(client = client) }
} }
/** /**
...@@ -109,6 +111,10 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -109,6 +111,10 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
} }
} }
return KubernetesBenchmarkDeployment( return KubernetesBenchmarkDeployment(
sutBeforeActions = sut.beforeActions,
sutAfterActions = sut.afterActions,
loadGenBeforeActions = loadGenerator.beforeActions,
loadGenAfterActions = loadGenerator.afterActions,
appResources = appResources.map { it.second }, appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second }, loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay, loadGenerationDelay = loadGenerationDelay,
...@@ -118,4 +124,13 @@ class KubernetesBenchmark : KubernetesResource, Benchmark { ...@@ -118,4 +124,13 @@ class KubernetesBenchmark : KubernetesResource, Benchmark {
client = this.client client = this.client
) )
} }
/**
* This function can be used to set the Kubernetes client manually. This is for example necessary for testing.
*
* @param client
*/
fun setClient(client: NamespacedKubernetesClient) {
this.client = client
}
} }
...@@ -23,6 +23,10 @@ private val logger = KotlinLogging.logger {} ...@@ -23,6 +23,10 @@ private val logger = KotlinLogging.logger {}
*/ */
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmarkDeployment( class KubernetesBenchmarkDeployment(
private val sutBeforeActions: List<Action>,
private val sutAfterActions: List<Action>,
private val loadGenBeforeActions: List<Action>,
private val loadGenAfterActions: List<Action>,
val appResources: List<KubernetesResource>, val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>, val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long, private val loadGenerationDelay: Long,
...@@ -45,10 +49,13 @@ class KubernetesBenchmarkDeployment( ...@@ -45,10 +49,13 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics) kafkaController.createTopics(kafkaTopics)
sutBeforeActions.forEach { it.exec(client = client) }
appResources.forEach { kubernetesManager.deploy(it) } appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." } logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis()) Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis())
loadGenBeforeActions.forEach { it.exec(client = client) }
loadGenResources.forEach { kubernetesManager.deploy(it) } loadGenResources.forEach { kubernetesManager.deploy(it) }
} }
/** /**
...@@ -59,7 +66,9 @@ class KubernetesBenchmarkDeployment( ...@@ -59,7 +66,9 @@ class KubernetesBenchmarkDeployment(
*/ */
override fun teardown() { override fun teardown() {
loadGenResources.forEach { kubernetesManager.remove(it) } loadGenResources.forEach { kubernetesManager.remove(it) }
loadGenAfterActions.forEach { it.exec(client = client) }
appResources.forEach { kubernetesManager.remove(it) } appResources.forEach { kubernetesManager.remove(it) }
sutAfterActions.forEach { it.exec(client = client) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
ResourceByLabelHandler(client).removePods( ResourceByLabelHandler(client).removePods(
labelName = LAG_EXPORTER_POD_LABEL_NAME, labelName = LAG_EXPORTER_POD_LABEL_NAME,
......
...@@ -8,5 +8,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection ...@@ -8,5 +8,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
class Resources { class Resources {
lateinit var resources: List<ResourceSets> lateinit var resources: List<ResourceSets>
lateinit var beforeActions: List<Action>
lateinit var afterActions: List<Action>
} }
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import theodolite.benchmark.Action
import theodolite.benchmark.ActionSelector
import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.ResourceSets
import theodolite.model.crd.BenchmarkCRD
import theodolite.model.crd.BenchmarkStates
import theodolite.model.crd.KubernetesBenchmarkList
class BenchmarkStateChecker(
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
private val benchmarkStateHandler: BenchmarkStateHandler,
private val client: NamespacedKubernetesClient
) {
fun start(running: Boolean) {
Thread {
while (running) {
updateBenchmarkStatus()
Thread.sleep(100 * 1)
}
}.start()
}
/**
* Checks and updates the states off all deployed benchmarks.
*
*/
fun updateBenchmarkStatus() {
this.benchmarkCRDClient
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { Pair(it, checkState(it.spec)) }
.forEach { setState(it.first, it.second) }
}
private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) {
benchmarkStateHandler.setResourceSetState(resource.spec.name, state)
}
/**
* Checks the state of the benchmark.
*
* @param benchmark The benchmark to check
* @return [BenchmarkStates.READY] iff all resource could be loaded and all actions could be executed, [BenchmarkStates.PENDING] else
*/
private fun checkState(benchmark: KubernetesBenchmark): BenchmarkStates {
return if (checkActionCommands(benchmark) == BenchmarkStates.READY
&& checkResources(benchmark) == BenchmarkStates.READY
) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
}
/**
* Checks if all specified actions of the given benchmark could be executed or not
*
* @param benchmark The benchmark to check
* @return The state of this benchmark. [BenchmarkStates.READY] if all actions could be executed, else [BenchmarkStates.PENDING]
*/
private fun checkActionCommands(benchmark: KubernetesBenchmark): BenchmarkStates {
return if (checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.beforeActions)
&& checkIfActionPossible(benchmark.infrastructure.resources, benchmark.sut.afterActions)
&& checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions)
&& checkIfActionPossible(benchmark.infrastructure.resources, benchmark.loadGenerator.beforeActions)
) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
}
/**
* Action commands are called on a pod. To verify that an action command can be executed,
* it checks that the specified pods are either currently running in the cluster or
* have been specified as infrastructure in the benchmark.
*
* @param benchmark the benchmark to check
* @param actions the actions
* @return true if all actions could be executed, else false
*/
private fun checkIfActionPossible(resourcesSets: List<ResourceSets>, actions: List<Action>): Boolean {
return !actions.map {
checkIfResourceIsDeployed(it.selector) || checkIfResourceIsInfrastructure(resourcesSets, it.selector)
}.contains(false)
}
/**
* Checks for the given actionSelector whether the required resources are already deployed in the cluster or not
*
* @param selector the actionSelector to check
* @return true if the required resources are found, else false
*/
fun checkIfResourceIsDeployed(selector: ActionSelector): Boolean {
val pods = this.client
.pods()
.withLabels(selector.pod.matchLabels)
.list()
.items
return if (pods.isNotEmpty() && selector.container.isNotEmpty()) {
pods.map { pod ->
pod
.spec
.containers
.map { it.name }
.contains(selector.container)
}.contains(true)
} else {
pods.isNotEmpty()
}
}
/**
* Checks for the given actionSelector whether the required resources are specified as infrastructure or not
*
* @param benchmark the benchmark to check
* @param selector the actionSelector to check
* @return true if the required resources are found, else false
*/
fun checkIfResourceIsInfrastructure(resourcesSets: List<ResourceSets>, selector: ActionSelector): Boolean {
val resources = resourcesSets.flatMap { it.loadResourceSet(this.client) }
if (resources.isEmpty()) {
return false
}
var podExist = resources.map { it.second }
.filterIsInstance<Deployment>()
.filter { it.metadata.labels.containsMatchLabels(selector.pod.matchLabels) }
.any {
if (selector.container.isNotEmpty()) {
it.spec.template.spec.containers.map { it.name }.contains(selector.container)
} else {
true
}
}
if (podExist) {
return true
}
podExist = resources.map { it.second }
.filterIsInstance<StatefulSet>()
.filter { it.metadata.labels.containsMatchLabels(selector.pod.matchLabels) }
.any {
if (selector.container.isNotEmpty()) {
it.spec.template.spec.containers.map { it.name }.contains(selector.container)
} else {
true
}
}
if (podExist) {
return true
}
return false
}
/**
* Checks if it is possible to load all specified Kubernetes manifests.
*
* @param benchmark The benchmark to check
* @return The state of this benchmark. [BenchmarkStates.READY] if all resources could be loaded, else [BenchmarkStates.PENDING]
*/
fun checkResources(benchmark: KubernetesBenchmark): BenchmarkStates {
return try {
val appResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
val loadGenResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.loadGenerator.resources)
if (appResources.isNotEmpty() && loadGenResources.isNotEmpty()) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
} catch (e: Exception) {
BenchmarkStates.PENDING
}
}
}
private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>): Boolean {
for (kv in matchLabels) {
if (kv.value != this[kv.key as K]) {
return false
}
}
return true
}
...@@ -28,7 +28,7 @@ class TheodoliteController( ...@@ -28,7 +28,7 @@ class TheodoliteController(
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
private val executionStateHandler: ExecutionStateHandler, private val executionStateHandler: ExecutionStateHandler,
private val benchmarkStateHandler: BenchmarkStateHandler private val benchmarkStateChecker: BenchmarkStateChecker
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
...@@ -39,7 +39,7 @@ class TheodoliteController( ...@@ -39,7 +39,7 @@ class TheodoliteController(
sleep(5000) // wait until all states are correctly set sleep(5000) // wait until all states are correctly set
while (true) { while (true) {
reconcile() reconcile()
updateBenchmarkStatus() benchmarkStateChecker.start(true)
sleep(2000) sleep(2000)
} }
} }
...@@ -47,7 +47,6 @@ class TheodoliteController( ...@@ -47,7 +47,6 @@ class TheodoliteController(
private fun reconcile() { private fun reconcile() {
do { do {
val execution = getNextExecution() val execution = getNextExecution()
updateBenchmarkStatus()
if (execution != null) { if (execution != null) {
val benchmark = getBenchmarks() val benchmark = getBenchmarks()
.map { it.spec } .map { it.spec }
...@@ -108,8 +107,7 @@ class TheodoliteController( ...@@ -108,8 +107,7 @@ class TheodoliteController(
type = "WARNING", type = "WARNING",
reason = "Execution failed", reason = "Execution failed",
message = "An error occurs while executing: ${e.message}") message = "An error occurs while executing: ${e.message}")
logger.error { "Failure while executing execution ${execution.name} with benchmark ${benchmark.name}." } logger.error(e) { "Failure while executing execution ${execution.name} with benchmark ${benchmark.name}." }
logger.error { "Problem is: $e" }
executionStateHandler.setExecutionState(execution.name, ExecutionStates.FAILURE) executionStateHandler.setExecutionState(execution.name, ExecutionStates.FAILURE)
} }
executionStateHandler.stopDurationStateTimer() executionStateHandler.stopDurationStateTimer()
...@@ -137,7 +135,6 @@ class TheodoliteController( ...@@ -137,7 +135,6 @@ class TheodoliteController(
} }
} }
/** /**
* Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution] * Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution]
* is selected for the next execution depends on three points: * is selected for the next execution depends on three points:
...@@ -171,34 +168,7 @@ class TheodoliteController( ...@@ -171,34 +168,7 @@ class TheodoliteController(
.firstOrNull() .firstOrNull()
} }
private fun updateBenchmarkStatus() {
this.benchmarkCRDClient
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { Pair(it, checkResource(it.spec)) }
.forEach { setState(it.first, it.second ) }
}
private fun setState(resource: BenchmarkCRD, state: BenchmarkStates) {
benchmarkStateHandler.setResourceSetState(resource.spec.name, state)
}
private fun checkResource(benchmark: KubernetesBenchmark): BenchmarkStates {
return try {
val appResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
val loadGenResources =
benchmark.loadKubernetesResources(resourceSet = benchmark.sut.resources)
if(appResources.isNotEmpty() && loadGenResources.isNotEmpty()) {
BenchmarkStates.READY
} else {
BenchmarkStates.PENDING
}
} catch (e: Exception) {
BenchmarkStates.PENDING
}
}
fun isExecutionRunning(executionName: String): Boolean { fun isExecutionRunning(executionName: String): Boolean {
if (!::executor.isInitialized) return false if (!::executor.isInitialized) return false
......
...@@ -34,6 +34,7 @@ class TheodoliteOperator { ...@@ -34,6 +34,7 @@ class TheodoliteOperator {
private lateinit var controller: TheodoliteController private lateinit var controller: TheodoliteController
private lateinit var executionStateHandler: ExecutionStateHandler private lateinit var executionStateHandler: ExecutionStateHandler
private lateinit var benchmarkStateHandler: BenchmarkStateHandler private lateinit var benchmarkStateHandler: BenchmarkStateHandler
private lateinit var benchmarkStateChecker: BenchmarkStateChecker
fun start() { fun start() {
...@@ -71,7 +72,7 @@ class TheodoliteOperator { ...@@ -71,7 +72,7 @@ class TheodoliteOperator {
controller = getController( controller = getController(
client = client, client = client,
executionStateHandler = getExecutionStateHandler(client = client), executionStateHandler = getExecutionStateHandler(client = client),
benchmarkStateHandler = getBenchmarkStateHandler(client = client) benchmarkStateChecker = getBenchmarkStateChecker(client = client)
) )
getExecutionEventHandler(controller, client).startAllRegisteredInformers() getExecutionEventHandler(controller, client).startAllRegisteredInformers()
...@@ -112,17 +113,28 @@ class TheodoliteOperator { ...@@ -112,17 +113,28 @@ class TheodoliteOperator {
return benchmarkStateHandler return benchmarkStateHandler
} }
fun getBenchmarkStateChecker(client: NamespacedKubernetesClient) : BenchmarkStateChecker {
if (!::benchmarkStateChecker.isInitialized) {
this.benchmarkStateChecker = BenchmarkStateChecker(
client = client,
benchmarkStateHandler = getBenchmarkStateHandler(client = client),
benchmarkCRDClient = getBenchmarkClient(client = client))
}
return benchmarkStateChecker
}
fun getController( fun getController(
client: NamespacedKubernetesClient, client: NamespacedKubernetesClient,
executionStateHandler: ExecutionStateHandler, executionStateHandler: ExecutionStateHandler,
benchmarkStateHandler: BenchmarkStateHandler benchmarkStateChecker: BenchmarkStateChecker
): TheodoliteController { ): TheodoliteController {
if (!::controller.isInitialized) { if (!::controller.isInitialized) {
this.controller = TheodoliteController( this.controller = TheodoliteController(
benchmarkCRDClient = getBenchmarkClient(client), benchmarkCRDClient = getBenchmarkClient(client),
executionCRDClient = getExecutionClient(client), executionCRDClient = getExecutionClient(client),
executionStateHandler = executionStateHandler, executionStateHandler = executionStateHandler,
benchmarkStateHandler = benchmarkStateHandler benchmarkStateChecker = benchmarkStateChecker
) )
} }
return this.controller return this.controller
......
package theodolite.util
class ActionCommandFailedException(message: String, e: Exception? = null) : DeploymentFailedException(message,e) {
}
\ No newline at end of file
...@@ -12,6 +12,11 @@ class Configuration { ...@@ -12,6 +12,11 @@ class Configuration {
val NAMESPACE = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE val NAMESPACE = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
val COMPONENT_NAME = System.getenv("COMPONENT_NAME") ?: DEFAULT_COMPONENT_NAME val COMPONENT_NAME = System.getenv("COMPONENT_NAME") ?: DEFAULT_COMPONENT_NAME
val EXECUTION_MODE = System.getenv("MODE") ?: ExecutionModes.STANDALONE.value val EXECUTION_MODE = System.getenv("MODE") ?: ExecutionModes.STANDALONE.value
/**
* Specifies how long Theodolite should wait (in sec) before aborting the execution of an action command.
*/
const val TIMEOUT_SECONDS: Long = 30L
} }
} }
package theodolite.benchmark
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.api.model.PodListBuilder
import io.fabric8.kubernetes.client.server.mock.KubernetesServer
import io.fabric8.kubernetes.client.utils.Utils
import io.quarkus.test.junit.QuarkusTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import theodolite.execution.operator.TheodoliteController
import theodolite.execution.operator.TheodoliteOperator
import theodolite.util.ActionCommandFailedException
@QuarkusTest
class ActionCommandTest {
private val server = KubernetesServer(false, false)
lateinit var controller: TheodoliteController
@BeforeEach
fun setUp() {
server.before()
val operator = TheodoliteOperator()
this.controller = operator.getController(
client = server.client,
executionStateHandler = operator.getExecutionStateHandler(client = server.client),
benchmarkStateChecker = operator.getBenchmarkStateChecker(client = server.client)
)
val pod: Pod = PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withLabels<String, String>(mapOf("app" to "pod"))
.withNamespace("test").and()
.build()
val ready: Pod = createReadyFrom(pod, "True")
val podList = PodListBuilder().build()
podList.items.add(0, ready)
server
.expect()
.withPath("/api/v1/namespaces/test/pods?labelSelector=${Utils.toUrlEncoded("app=pod")}")
.andReturn(200, podList)
.always()
server
.expect()
.get()
.withPath("/api/v1/namespaces/test/pods/pod1")
.andReturn(200, ready)
.always()
server
.expect()
.withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true")
.andUpgradeToWebSocket()
.open(ErrorChannelMessage("{\"metadata\":{},\"status\":\"Success\"}\n"))
.done()
.always()
server
.expect()
.withPath("/api/v1/namespaces/test/pods/pod1/exec?command=error-command&stdout=true&stderr=true")
.andUpgradeToWebSocket()
.open(ErrorChannelMessage("{\"metadata\":{},\"status\":\"failed\", \"details\":{}}\n"))
.done()
.always()
}
/**
* Copied from fabric8 Kubernetes Client repository
*
* @param pod
* @param status
* @return
*/
fun createReadyFrom(pod: Pod, status: String): Pod {
return PodBuilder(pod)
.withNewStatus()
.addNewCondition()
.withType("Ready")
.withStatus(status)
.endCondition()
.endStatus()
.build()
}
@AfterEach
fun tearDown() {
server.after()
}
@Test
fun testGetPodName() {
assertEquals("pod1", ActionCommand(client = server.client).getPodName(mutableMapOf("app" to "pod"), 1))
}
@Test
fun testActionSuccess() {
val action = Action()
action.selector = ActionSelector()
action.selector.pod = PodSelector()
action.selector.pod.matchLabels = mutableMapOf("app" to "pod")
action.exec = Command()
action.exec.command = arrayOf("ls")
action.exec.timeoutSeconds = 10L
action.exec(server.client)
assertEquals(
"/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true&stderr=true",
server.lastRequest.path)
}
@Test
fun testActionFailed() {
val action = Action()
action.selector = ActionSelector()
action.selector.pod = PodSelector()
action.selector.pod.matchLabels = mutableMapOf("app" to "pod")
action.exec = Command()
action.exec.command = arrayOf("error-command")
action.exec.timeoutSeconds = 10L
assertThrows<ActionCommandFailedException> { run { action.exec(server.client) } }
}
}
package theodolite.benchmark
import io.fabric8.mockwebserver.internal.WebSocketMessage
import java.nio.charset.StandardCharsets
class ErrorChannelMessage(body: String) : WebSocketMessage(0L, getBodyBytes(OUT_STREAM_ID, body), true, true) {
companion object {
private const val OUT_STREAM_ID: Byte = 3
private fun getBodyBytes(prefix: Byte, body: String): ByteArray {
val original = body.toByteArray(StandardCharsets.UTF_8)
val prefixed = ByteArray(original.size + 1)
prefixed[0] = prefix
System.arraycopy(original, 0, prefixed, 1, original.size)
return prefixed
}
}
}
...@@ -34,6 +34,13 @@ class BenchmarkCRDummy(name: String) { ...@@ -34,6 +34,13 @@ class BenchmarkCRDummy(name: String) {
benchmark.sut.resources = emptyList() benchmark.sut.resources = emptyList()
benchmark.loadGenerator.resources = emptyList() benchmark.loadGenerator.resources = emptyList()
benchmark.infrastructure.beforeActions = emptyList()
benchmark.infrastructure.afterActions = emptyList()
benchmark.sut.beforeActions = emptyList()
benchmark.sut.afterActions = emptyList()
benchmark.loadGenerator.beforeActions = emptyList()
benchmark.loadGenerator.afterActions = emptyList()
benchmark.resourceTypes = emptyList() benchmark.resourceTypes = emptyList()
benchmark.loadTypes = emptyList() benchmark.loadTypes = emptyList()
benchmark.kafkaConfig = kafkaConfig benchmark.kafkaConfig = kafkaConfig
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment