Skip to content
Snippets Groups Projects
Commit ad767837 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Use states to handle crd instead of queues

parent 0df1afb0
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!142Rewrite operator to use k8s states,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 574 additions and 219 deletions
package theodolite.benchmark package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
......
...@@ -2,8 +2,6 @@ package theodolite.benchmark ...@@ -2,8 +2,6 @@ package theodolite.benchmark
import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import kotlin.properties.Delegates import kotlin.properties.Delegates
...@@ -26,7 +24,7 @@ import kotlin.properties.Delegates ...@@ -26,7 +24,7 @@ import kotlin.properties.Delegates
*/ */
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class BenchmarkExecution : CustomResource(), Namespaced { class BenchmarkExecution : KubernetesResource {
var executionId: Int = 0 var executionId: Int = 0
lateinit var name: String lateinit var name: String
lateinit var benchmark: String lateinit var benchmark: String
...@@ -34,7 +32,7 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -34,7 +32,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var resources: ResourceDefinition lateinit var resources: ResourceDefinition
lateinit var slos: List<Slo> lateinit var slos: List<Slo>
lateinit var execution: Execution lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?> lateinit var configOverrides: MutableList<ConfigurationOverride?>
/** /**
* This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions] * This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions]
......
package theodolite.benchmark package theodolite.benchmark
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource import io.fabric8.kubernetes.client.CustomResource
...@@ -30,17 +31,19 @@ private var DEFAULT_NAMESPACE = "default" ...@@ -30,17 +31,19 @@ private var DEFAULT_NAMESPACE = "default"
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator]. * for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty Benchmark. * @constructor construct an empty Benchmark.
*/ */
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { class KubernetesBenchmark: KubernetesResource, Benchmark{
lateinit var name: String lateinit var name: String
lateinit var appResource: List<String> lateinit var appResource: List<String>
lateinit var loadGenResource: List<String> lateinit var loadGenResource: List<String>
lateinit var resourceTypes: List<TypeName> lateinit var resourceTypes: List<TypeName>
lateinit var loadTypes: List<TypeName> lateinit var loadTypes: List<TypeName>
lateinit var kafkaConfig: KafkaConfig lateinit var kafkaConfig: KafkaConfig
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
/** /**
* Loads [KubernetesResource]s. * Loads [KubernetesResource]s.
* It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using * It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using
......
...@@ -13,10 +13,6 @@ import theodolite.util.Resource ...@@ -13,10 +13,6 @@ import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.io.File import java.io.File
import java.io.PrintWriter import java.io.PrintWriter
import java.lang.IllegalArgumentException
import java.lang.Thread.sleep
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration import java.time.Duration
......
package theodolite.execution.operator
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.CustomResourceDoneable
import io.fabric8.kubernetes.client.CustomResourceList
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import java.lang.Thread.sleep
abstract class AbstractStateHandler<T,L,D>(
private val context: CustomResourceDefinitionContext,
private val client: KubernetesClient,
private val crd: Class<T>,
private val crdList: Class<L>,
private val donableCRD: Class<D>
): StateHandler where T : CustomResource, T: Namespaced, L: CustomResourceList<T>, D: CustomResourceDoneable<T> {
private val crdClient: MixedOperation<T, L, D, Resource<T, D>> =
this.client.customResources(this.context, this.crd, this.crdList, this.donableCRD)
@Synchronized
override fun setState(resourceName: String, f: (CustomResource) -> CustomResource?) {
this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.forEach { this.crdClient.updateStatus(it as T) }
}
@Synchronized
override fun getState(resourceName: String, f: (CustomResource) -> String?): String? {
return this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.firstOrNull()
}
@Synchronized
override fun blockUntilStateIsSet(resourceName: String, desiredStatusString: String, f: (CustomResource) -> String?, maxTries: Int): Boolean {
for (i in 0.rangeTo(maxTries)) {
val currentStatus = getState(resourceName, f)
if(currentStatus == desiredStatusString) {
return true
}
sleep(50)
}
return false
}
}
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting KubernetesBenchmarks.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController) : ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to add
*
* @see KubernetesBenchmark
*/
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark
}
/**
* Update a KubernetesBenchmark.
*
* @param oldBenchmark the KubernetesBenchmark to update
* @param newBenchmark the updated KubernetesBenchmark
*
* @see KubernetesBenchmark
*/
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
onAdd(newBenchmark)
}
}
/**
* Delete a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to delete
*
* @see KubernetesBenchmark
*/
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging
import org.json.JSONObject
import theodolite.execution.Shutdown
import theodolite.k8s.K8sContextFactory
import theodolite.model.crd.*
private val logger = KotlinLogging.logger {}
class ClusterSetup(
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>,
private val client: NamespacedKubernetesClient
) {
private val serviceMonitorContext = K8sContextFactory().create(
api = "v1",
scope = "Namespaced",
group = "monitoring.coreos.com",
plural = "servicemonitors"
)
fun clearClusterState(){
stopRunningExecution()
clearByLabel()
}
private fun stopRunningExecution() {
executionCRDClient
.inNamespace(client.namespace)
.list()
.items
.asSequence()
.filter { it.status.executionState == STATES.Running.value }
.forEach { execution ->
val benchmark = benchmarkCRDClient
.inNamespace(client.namespace)
.list()
.items
.firstOrNull { it.metadata.name == execution.spec.benchmark }
if (benchmark != null) {
execution.spec.name = execution.metadata.name
benchmark.spec.name = benchmark.metadata.name
Shutdown(execution.spec, benchmark.spec).start()
} else {
logger.error {
"Execution with state ${STATES.Running.value} was found, but no corresponding benchmark. " +
"Could not initialize cluster." }
}
}
}
private fun clearByLabel() {
this.client.services().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().deployments().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().statefulSets().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.configMaps().withLabel("app.kubernetes.io/created-by=theodolite").delete()
val serviceMonitors = JSONObject(
this.client.customResource(serviceMonitorContext)
.list(client.namespace, mapOf(Pair("app.kubernetes.io/created-by", "theodolite")))
)
.getJSONArray("items")
(0 until serviceMonitors.length())
.map { serviceMonitors.getJSONObject(it).getJSONObject("metadata").getString("name") }
.forEach { this.client.customResource(serviceMonitorContext).delete(client.namespace, it) }
}
}
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import io.fabric8.kubernetes.client.informers.ResourceEventHandler import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.model.crd.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -14,17 +17,30 @@ private val logger = KotlinLogging.logger {} ...@@ -14,17 +17,30 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController * @see TheodoliteController
* @see BenchmarkExecution * @see BenchmarkExecution
*/ */
class ExecutionHandler(private val controller: TheodoliteController) : ResourceEventHandler<BenchmarkExecution> { class ExecutionHandler(
private val controller: TheodoliteController,
private val stateHandler: ExecutionStateHandler
) : ResourceEventHandler<ExecutionCRD> {
private val gson: Gson = GsonBuilder().enableComplexMapKeySerialization().create()
/** /**
* Add an execution to the end of the queue of the TheodoliteController. * Add an execution to the end of the queue of the TheodoliteController.
* *
* @param execution the execution to add * @param ExecutionCRD the execution to add
*/ */
override fun onAdd(execution: BenchmarkExecution) { @Synchronized
execution.name = execution.metadata.name override fun onAdd(execution: ExecutionCRD) {
logger.info { "Add new execution ${execution.metadata.name} to queue." } logger.info { "Add execution ${execution.metadata.name}" }
this.controller.executionsQueue.add(execution) execution.spec.name = execution.metadata.name
when (this.stateHandler.getExecutionState(execution.metadata.name)) {
null -> this.stateHandler.setExecutionState(execution.spec.name, STATES.Pending)
STATES.Running -> {
this.stateHandler.setExecutionState(execution.spec.name, STATES.Restart)
if(this.controller.isExecutionRunning(execution.spec.name)){
this.controller.stop(restart=true)
}
}
}
} }
/** /**
...@@ -32,40 +48,39 @@ class ExecutionHandler(private val controller: TheodoliteController) : ResourceE ...@@ -32,40 +48,39 @@ class ExecutionHandler(private val controller: TheodoliteController) : ResourceE
* added to the beginning of the queue of the TheodoliteController. * added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue. * Otherwise, it is just added to the beginning of the queue.
* *
* @param oldExecution the old execution * @param oldExecutionCRD the old execution
* @param newExecution the new execution * @param newExecutionCRD the new execution
*/ */
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { @Synchronized
logger.info { "Add updated execution to queue." } override fun onUpdate(oldExecution: ExecutionCRD, newExecution: ExecutionCRD) {
newExecution.name = newExecution.metadata.name logger.info { "Receive update event for execution ${oldExecution.metadata.name}" }
try { newExecution.spec.name = newExecution.metadata.name
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } oldExecution.spec.name = oldExecution.metadata.name
} catch (e: NullPointerException) { if(gson.toJson(oldExecution.spec) != gson.toJson(newExecution.spec)) {
logger.warn { "No execution found for deletion" } when(this.stateHandler.getExecutionState(newExecution.metadata.name)) {
STATES.Running -> {
this.stateHandler.setExecutionState(newExecution.spec.name, STATES.Restart)
if (this.controller.isExecutionRunning(newExecution.spec.name)){
this.controller.stop(restart=true)
}
}
STATES.Restart -> {} // should this set to pending?
else -> this.stateHandler.setExecutionState(newExecution.spec.name, STATES.Pending)
} }
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} }
} }
/** /**
* Delete an execution from the queue of the TheodoliteController. * Delete an execution from the queue of the TheodoliteController.
* *
* @param execution the execution to delete * @param ExecutionCRD the execution to delete
*/ */
override fun onDelete(execution: BenchmarkExecution, b: Boolean) { @Synchronized
try { override fun onDelete(execution: ExecutionCRD, b: Boolean) {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } logger.info { "Delete execution ${execution.metadata.name}" }
logger.info { "Delete execution ${execution.metadata.name} from queue." } if(execution.status.executionState == STATES.Running.value
} catch (e: NullPointerException) { && this.controller.isExecutionRunning(execution.spec.name)) {
logger.warn { "No execution found for deletion" } this.controller.stop()
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
} }
} }
} }
package theodolite.execution.operator
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import theodolite.model.crd.BenchmarkExecutionList
import theodolite.model.crd.ExecutionCRD
import theodolite.model.crd.STATES
import java.lang.Thread.sleep
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
class ExecutionStateHandler(context: CustomResourceDefinitionContext, val client: KubernetesClient):
AbstractStateHandler<ExecutionCRD, BenchmarkExecutionList, DoneableExecution>(
context = context,
client = client,
crd = ExecutionCRD::class.java,
crdList = BenchmarkExecutionList::class.java,
donableCRD = DoneableExecution::class.java) {
private var runExecutionDurationTimer: AtomicBoolean = AtomicBoolean(false)
private fun getExecutionLambda() = { cr: CustomResource ->
var execState = ""
if (cr is ExecutionCRD) { execState = cr.status.executionState }
execState
}
private fun getDurationLambda() = { cr: CustomResource ->
var execState = ""
if (cr is ExecutionCRD) { execState = cr.status.executionState }
execState
}
fun setExecutionState(resourceName: String, status: STATES): Boolean {
setState(resourceName) {cr -> if(cr is ExecutionCRD) cr.status.executionState = status.value; cr}
return blockUntilStateIsSet(resourceName, status.value, getExecutionLambda())
}
fun getExecutionState(resourceName: String) : STATES? {
val status = this.getState(resourceName, getExecutionLambda())
return STATES.values().firstOrNull { it.value == status }
}
fun setDurationState(resourceName: String, duration: Duration) {
setState(resourceName) { cr -> if (cr is ExecutionCRD) cr.status.executionDuration = durationToK8sString(duration); cr }
blockUntilStateIsSet(resourceName, durationToK8sString(duration), getDurationLambda())
}
fun getDurationState(resourceName: String): String? {
return this.getState(resourceName, getDurationLambda())
}
private fun durationToK8sString(duration: Duration): String {
val sec = duration.seconds
return when {
sec <= 120 -> "${sec}s" // max 120s
sec < 60 * 99 -> "${duration.toMinutes()}m" // max 99m
sec < 60 * 60 * 99 -> "${duration.toHours()}h" // max 99h
else -> "${duration.toDays()}d + ${duration.minusDays(duration.toDays()).toHours()}h"
}
}
fun startDurationStateTimer(resourceName: String) {
this.runExecutionDurationTimer.set(true)
val startTime = Instant.now().toEpochMilli()
Thread {
while (this.runExecutionDurationTimer.get()) {
val duration = Duration.ofMillis(Instant.now().minusMillis(startTime).toEpochMilli())
setDurationState(resourceName, duration)
sleep(100 * 1)
}
}.start()
}
@Synchronized
fun stopDurationStateTimer() {
this.runExecutionDurationTimer.set(false)
sleep(100 * 2)
}
}
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.CustomResource
private const val MAX_TRIES: Int = 5
interface StateHandler {
fun setState(resourceName: String, f: (CustomResource) -> CustomResource?)
fun getState(resourceName: String, f: (CustomResource) -> String?): String?
fun blockUntilStateIsSet(
resourceName: String,
desiredStatusString: String,
f: (CustomResource) -> String?,
maxTries: Int = MAX_TRIES): Boolean
}
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.execution.TheodoliteExecutor import theodolite.execution.TheodoliteExecutor
import theodolite.model.crd.*
import theodolite.util.ConfigurationOverride
import theodolite.util.PatcherDefinition
import java.lang.Thread.sleep import java.lang.Thread.sleep
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/** /**
* The controller implementation for Theodolite. * The controller implementation for Theodolite.
* *
* Maintains a Dequeue, based on ConcurrentLinkedDequeue, of executions to be executed for a benchmark.
*
* @param client The NamespacedKubernetesClient
* @param executionContext The CustomResourceDefinitionContext
*
* @see NamespacedKubernetesClient * @see NamespacedKubernetesClient
* @see CustomResourceDefinitionContext * @see CustomResourceDefinitionContext
* @see BenchmarkExecution * @see BenchmarkExecution
* @see KubernetesBenchmark * @see KubernetesBenchmark
* @see ConcurrentLinkedDeque * @see ConcurrentLinkedDeque
*/ */
class TheodoliteController( class TheodoliteController(
val client: NamespacedKubernetesClient, private val namespace: String,
val executionContext: CustomResourceDefinitionContext, val path: String,
val path: String private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>,
private val executionStateHandler: ExecutionStateHandler
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque()
val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap()
var isUpdated = AtomicBoolean(false)
var executionID = AtomicInteger(0)
/** /**
*
* Runs the TheodoliteController forever. * Runs the TheodoliteController forever.
*/ */
fun run() { fun run() {
sleep(5000) // wait until all states are correctly set
while (true) { while (true) {
try {
reconcile() reconcile()
logger.info { "Theodolite is waiting for new matching benchmark and execution." }
logger.info { "Currently available executions: " }
executionsQueue.forEach {
logger.info { "${it.name} : waiting for : ${it.benchmark}" }
}
logger.info { "Currently available benchmarks: " }
benchmarks.forEach {
logger.info { it.key }
}
sleep(2000) sleep(2000)
} catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e." }
}
} }
} }
/**
* Ensures that the application state corresponds to the defined KubernetesBenchmarks and BenchmarkExecutions.
*
* @see KubernetesBenchmark
* @see BenchmarkExecution
*/
@Synchronized
private fun reconcile() { private fun reconcile() {
while (executionsQueue.isNotEmpty()) { do {
val execution = executionsQueue.peek() val execution = getNextExecution()
val benchmark = benchmarks[execution.benchmark] if (execution != null) {
val benchmark = getBenchmarks()
if (benchmark == null) { .firstOrNull { it.name == execution.benchmark }
logger.debug { "No benchmark found for execution ${execution.name}." } if (benchmark != null) {
sleep(1000)
} else {
runExecution(execution, benchmark) runExecution(execution, benchmark)
} }
} else {
logger.info { "Could not find executable execution." }
} }
} while (execution != null)
} }
/** /**
* Execute a benchmark with a defined KubernetesBenchmark and BenchmarkExecution * Execute a benchmark with a defined KubernetesBenchmark and BenchmarkExecution
* *
* @see KubernetesBenchmark
* @see BenchmarkExecution * @see BenchmarkExecution
*/ */
@Synchronized private fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { setAdditionalLabels(execution.name,
execution.executionId = executionID.getAndSet(executionID.get() + 1) "deployed-for-execution",
isUpdated.set(false) benchmark.appResource + benchmark.loadGenResource,
benchmark.path = path execution)
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." } setAdditionalLabels(benchmark.name,
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) "deployed-for-benchmark",
executor.run() benchmark.appResource + benchmark.loadGenResource,
execution)
setAdditionalLabels("theodolite",
"app.kubernetes.io/created-by",
benchmark.appResource + benchmark.loadGenResource,
execution)
executionStateHandler.setExecutionState(execution.name, STATES.Running)
executionStateHandler.startDurationStateTimer(execution.name)
try { try {
if (!isUpdated.get()) { executor = TheodoliteExecutor(execution, benchmark)
this.executionsQueue.removeIf { e -> e.name == execution.name } executor.run()
client.customResource(executionContext).delete(client.namespace, execution.metadata.name) when (executionStateHandler.getExecutionState(execution.name)) {
STATES.Restart -> runExecution(execution, benchmark)
STATES.Running -> {
executionStateHandler.setExecutionState(execution.name, STATES.Finished)
logger.info { "Execution of ${execution.name} is finally stopped." }
}
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.warn { "Deletion skipped." } logger.error { "Failure while executing execution ${execution.name} with benchmark ${benchmark.name}." }
logger.error { "Problem is: $e" }
executionStateHandler.setExecutionState(execution.name, STATES.Failure)
}
executionStateHandler.stopDurationStateTimer()
} }
logger.info { "Execution of ${execution.name} is finally stopped." } @Synchronized
fun stop(restart: Boolean = false) {
if (!::executor.isInitialized) return
if (restart) {
executionStateHandler.setExecutionState(this.executor.getExecution().name, STATES.Restart)
} else {
executionStateHandler.setExecutionState(this.executor.getExecution().name, STATES.Interrupted)
logger.warn { "Execution ${executor.getExecution().name} unexpected interrupted" }
}
this.executor.executor.run.set(false)
} }
/** /**
* @return true if the TheodoliteExecutor of this controller is initialized. Else returns false. * @return all available [BenchmarkCRD]s
*/
private fun getBenchmarks(): List<KubernetesBenchmark> {
return this.benchmarkCRDClient
.inNamespace(namespace)
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { it.spec.path = path; it }
.map { it.spec }
}
/**
* Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution]
* is selected for the next execution depends on three points:
*
* 1. Only executions are considered for which a matching benchmark is available on the cluster
* 2. The Status of the execution must be [STATES.Pending] or [STATES.Restart]
* 3. Of the remaining [BenchmarkCRD], those with status [STATES.Restart] are preferred,
* then, if there is more than one, the oldest execution is chosen.
* *
* @see TheodoliteExecutor * @return the next execution or null
*/ */
@Synchronized private fun getNextExecution(): BenchmarkExecution? {
fun isInitialized(): Boolean { val availableBenchmarkNames = getBenchmarks()
return ::executor.isInitialized .map { it.name }
return executionCRDClient
.inNamespace(namespace)
.list()
.items
.asSequence()
.map { it.spec.name = it.metadata.name; it }
.filter {
it.status.executionState == STATES.Pending.value ||
it.status.executionState == STATES.Restart.value
}
.filter { availableBenchmarkNames.contains(it.spec.benchmark) }
.sortedWith(stateComparator().thenBy { it.metadata.creationTimestamp })
.map { it.spec }
.firstOrNull()
}
/**
* Simple comparator which can be used to order a list of [ExecutionCRD] such that executions with
* status [STATES.Restart] are before all other executions.
*/
private fun stateComparator() = Comparator<ExecutionCRD> { a, b ->
when {
(a == null && b == null) -> 0
(a.status.executionState == STATES.Restart.value) -> -1
else -> 1
}
}
fun isExecutionRunning(executionName: String): Boolean {
return this.executor.getExecution().name == executionName
}
private fun setAdditionalLabels(
labelValue: String,
labelName: String,
resources: List<String>,
execution: BenchmarkExecution
) {
val additionalConfigOverrides = mutableListOf<ConfigurationOverride>()
resources.forEach {
run {
val configurationOverride = ConfigurationOverride()
configurationOverride.patcher = PatcherDefinition()
configurationOverride.patcher.type = "LabelPatcher"
configurationOverride.patcher.variableName = labelName
configurationOverride.patcher.resource = it
configurationOverride.value = labelValue
additionalConfigOverrides.add(configurationOverride)
}
}
execution.configOverrides.addAll(additionalConfigOverrides)
} }
} }
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import io.fabric8.kubernetes.internal.KubernetesDeserializer import io.fabric8.kubernetes.internal.KubernetesDeserializer
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.BenchmarkExecutionList
import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.KubernetesBenchmarkList
import theodolite.k8s.K8sContextFactory import theodolite.k8s.K8sContextFactory
import theodolite.model.crd.*
private const val DEFAULT_NAMESPACE = "default" private const val DEFAULT_NAMESPACE = "default"
...@@ -16,7 +15,7 @@ private const val EXECUTION_SINGULAR = "execution" ...@@ -16,7 +15,7 @@ private const val EXECUTION_SINGULAR = "execution"
private const val EXECUTION_PLURAL = "executions" private const val EXECUTION_PLURAL = "executions"
private const val BENCHMARK_SINGULAR = "benchmark" private const val BENCHMARK_SINGULAR = "benchmark"
private const val BENCHMARK_PLURAL = "benchmarks" private const val BENCHMARK_PLURAL = "benchmarks"
private const val API_VERSION = "v1alpha1" private const val API_VERSION = "v1"
private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong() private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong()
private const val GROUP = "theodolite.com" private const val GROUP = "theodolite.com"
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -33,42 +32,82 @@ class TheodoliteOperator { ...@@ -33,42 +32,82 @@ class TheodoliteOperator {
* Start the operator. * Start the operator.
*/ */
fun start() { fun start() {
// FIXME("Remove all benchmark state handling")
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
val client = DefaultKubernetesClient().inNamespace(namespace) val client = DefaultKubernetesClient().inNamespace(namespace)
client.use {
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION", "$GROUP/$API_VERSION",
EXECUTION_SINGULAR, EXECUTION_SINGULAR,
BenchmarkExecution::class.java ExecutionCRD::class.java
) )
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION", "$GROUP/$API_VERSION",
BENCHMARK_SINGULAR, BENCHMARK_SINGULAR,
KubernetesBenchmark::class.java BenchmarkCRD::class.java
) )
val contextFactory = K8sContextFactory() val contextFactory = K8sContextFactory()
val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL) val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL)
val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL) val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL)
val executionCRDClient: MixedOperation<
ExecutionCRD,
BenchmarkExecutionList,
DoneableExecution,
Resource<ExecutionCRD, DoneableExecution>>
= client.customResources(
executionContext,
ExecutionCRD::class.java,
BenchmarkExecutionList::class.java,
DoneableExecution::class.java)
val benchmarkCRDClient: MixedOperation<
BenchmarkCRD,
KubernetesBenchmarkList,
DoneableBenchmark,
Resource<BenchmarkCRD, DoneableBenchmark>>
= client.customResources(
benchmarkContext,
BenchmarkCRD::class.java,
KubernetesBenchmarkList::class.java,
DoneableBenchmark::class.java)
val executionStateHandler = ExecutionStateHandler(
context = executionContext,
client = client)
val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
val controller = TheodoliteController(client = client, executionContext = executionContext, path = appResource) val controller =
TheodoliteController(
namespace = client.namespace,
path = appResource,
benchmarkCRDClient = benchmarkCRDClient,
executionCRDClient = executionCRDClient,
executionStateHandler = executionStateHandler)
val informerFactory = client.informers() val informerFactory = client.informers()
val informerExecution = informerFactory.sharedIndexInformerForCustomResource( val informerExecution = informerFactory.sharedIndexInformerForCustomResource(
executionContext, BenchmarkExecution::class.java, executionContext,
BenchmarkExecutionList::class.java, RESYNC_PERIOD ExecutionCRD::class.java,
) BenchmarkExecutionList::class.java,
val informerBenchmark = informerFactory.sharedIndexInformerForCustomResource( RESYNC_PERIOD
benchmarkContext, KubernetesBenchmark::class.java,
KubernetesBenchmarkList::class.java, RESYNC_PERIOD
) )
informerExecution.addEventHandler(ExecutionHandler(controller)) informerExecution.addEventHandler(ExecutionHandler(
informerBenchmark.addEventHandler(BenchmarkEventHandler(controller)) controller = controller,
informerFactory.startAllRegisteredInformers() stateHandler = executionStateHandler))
ClusterSetup(
executionCRDClient = executionCRDClient,
benchmarkCRDClient = benchmarkCRDClient,
client = client
).clearClusterState()
informerFactory.startAllRegisteredInformers()
controller.run() controller.run()
}
} }
} }
...@@ -63,6 +63,12 @@ class K8sManager(private val client: NamespacedKubernetesClient) { ...@@ -63,6 +63,12 @@ class K8sManager(private val client: NamespacedKubernetesClient) {
} }
} }
fun setLabel(resource: KubernetesResource) {
when(resource) {
is Deployment -> this.client
}
}
private fun blockUntilPodsDeleted(podLabel: String) { private fun blockUntilPodsDeleted(podLabel: String) {
while (!this.client.pods().withLabel(podLabel).list().items.isNullOrEmpty()) { while (!this.client.pods().withLabel(podLabel).list().items.isNullOrEmpty()) {
logger.info { "Wait for pods with label '$podLabel' to be deleted." } logger.info { "Wait for pods with label '$podLabel' to be deleted." }
......
...@@ -53,4 +53,9 @@ class ServiceMonitorWrapper(private val serviceMonitor: Map<String, String>) : C ...@@ -53,4 +53,9 @@ class ServiceMonitorWrapper(private val serviceMonitor: Map<String, String>) : C
val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String> val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String>
return smAsMap["name"]!! return smAsMap["name"]!!
} }
fun getLabels(): Map<String, String>{
val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String>
return smAsMap["labels"]!! as Map<String, String>
}
} }
package theodolite.model.crd
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import theodolite.benchmark.KubernetesBenchmark
@JsonDeserialize
class BenchmarkCRD(
var spec: KubernetesBenchmark = KubernetesBenchmark()
) : CustomResource(), Namespaced
\ No newline at end of file
package theodolite.benchmark package theodolite.model.crd
import io.fabric8.kubernetes.client.CustomResourceList import io.fabric8.kubernetes.client.CustomResourceList
class BenchmarkExecutionList : CustomResourceList<BenchmarkExecution>() class BenchmarkExecutionList : CustomResourceList<ExecutionCRD>()
\ No newline at end of file
package theodolite.model.crd
import io.fabric8.kubernetes.api.builder.Function
import io.fabric8.kubernetes.client.CustomResourceDoneable
class DoneableBenchmark(resource: BenchmarkCRD, function: Function<BenchmarkCRD, BenchmarkCRD>) :
CustomResourceDoneable<BenchmarkCRD>(resource, function)
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.CustomResourceDoneable
import io.fabric8.kubernetes.api.builder.Function
import theodolite.model.crd.ExecutionCRD
class DoneableExecution(resource: ExecutionCRD, function: Function<ExecutionCRD, ExecutionCRD>) :
CustomResourceDoneable<ExecutionCRD>(resource, function)
\ No newline at end of file
package theodolite.model.crd
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import theodolite.benchmark.BenchmarkExecution
@JsonDeserialize
class ExecutionCRD(
var spec: BenchmarkExecution = BenchmarkExecution(),
var status: ExecutionStatus = ExecutionStatus()
) : CustomResource(), Namespaced
\ No newline at end of file
package theodolite.model.crd
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
@JsonDeserialize
class ExecutionStatus(): KubernetesResource, CustomResource(), Namespaced {
var executionState: String = ""
var executionDuration: String = "-"
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment