Skip to content
Snippets Groups Projects
Commit f62a1029 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

merge upstream theodolite-kotlin

parents 951c4ef6 4ff527a7
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!138Load execution ID from file,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 847 additions and 304 deletions
{{- if .Values.benchmarkCRD.create -}} {{- if .Values.benchmarkCRD.create -}}
apiVersion: apiextensions.k8s.io/v1beta1 apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
name: benchmarks.theodolite.com name: benchmarks.theodolite.com
spec: spec:
group: theodolite.com group: theodolite.com
version: v1alpha1
names: names:
kind: benchmark kind: benchmark
plural: benchmarks plural: benchmarks
scope: Namespaced shortNames:
- bench
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
required: ["spec"]
properties:
spec:
type: object
required: []
properties:
name:
type: string
appResource:
type: array
minItems: 1
items:
type: string
loadGenResource:
type: array
minItems: 1
items:
type: string
resourceTypes:
type: array
minItems: 1
items:
type: object
properties:
typeName:
type: string
patchers:
type: array
minItems: 1
items:
type: object
properties:
type:
type: string
default: ""
resource:
type: string
default: ""
container:
type: string
default: ""
variableName:
type: string
default: ""
loadTypes:
type: array
minItems: 1
items:
type: object
properties:
typeName:
type: string
patchers:
type: array
minItems: 1
items:
type: object
properties:
type:
type: string
default: ""
resource:
type: string
default: ""
container:
type: string
default: ""
variableName:
type: string
default: ""
kafkaConfig:
type: object
properties:
bootstrapServer:
type: string
topics:
type: array
minItems: 1
items:
type: object
required: []
properties:
name:
type: string
default: ""
numPartitions:
type: integer
default: 0
replicationFactor:
type: integer
default: 0
removeOnly:
type: boolean
default: false
additionalPrinterColumns:
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
subresources: subresources:
status: {} status: {}
scope: Namespaced
{{- end }} {{- end }}
\ No newline at end of file
{{- if .Values.executionCRD.create -}} {{- if .Values.executionCRD.create -}}
apiVersion: apiextensions.k8s.io/v1beta1 apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition kind: CustomResourceDefinition
metadata: metadata:
name: executions.theodolite.com name: executions.theodolite.com
spec: spec:
group: theodolite.com group: theodolite.com
version: v1alpha1
names: names:
kind: execution kind: execution
plural: executions plural: executions
scope: Namespaced shortNames:
- exec
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
required: ["spec"]
properties:
spec:
type: object
required: ["benchmark", "load", "resources", "slos", "execution", "configOverrides"]
properties:
name:
type: string
default: ""
benchmark:
type: string
load: # definition of the load dimension
type: object
required: ["loadType", "loadValues"]
properties:
loadType:
type: string
loadValues:
type: array
items:
type: integer
resources: # definition of the resource dimension
type: object
required: ["resourceType", "resourceValues"]
properties:
resourceType:
type: string
resourceValues:
type: array
items:
type: integer
slos: # def of service level objectives
type: array
items:
type: object
required: ["sloType", "threshold", "prometheusUrl", "externalSloUrl", "offset", "warmup"]
properties:
sloType:
type: string
threshold:
type: integer
prometheusUrl:
type: string
externalSloUrl:
type: string
offset:
type: integer
warmup:
type: integer
execution: # def execution config
type: object
required: ["strategy", "duration", "repetitions", "restrictions"]
properties:
strategy:
type: string
duration:
type: integer
repetitions:
type: integer
loadGenerationDelay:
type: integer
restrictions:
type: array
items:
type: string
configOverrides:
type: array
items:
type: object
properties:
patcher:
type: object
properties:
type:
type: string
default: ""
resource:
type: string
default: ""
container:
type: string
default: ""
variableName:
type: string
default: ""
value:
type: string
status:
type: object
properties:
executionState:
description: ""
type: string
executionDuration:
description: "Duration of the execution in seconds"
type: string
additionalPrinterColumns:
- name: STATUS
type: string
description: State of the execution
jsonPath: .status.executionState
- name: Duration
type: string
description: Duration of the execution
jsonPath: .status.executionDuration
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
subresources: subresources:
status: {} status: {}
scope: Namespaced
status: {}
{{- end }} {{- end }}
\ No newline at end of file
apiVersion: theodolite.com/v1alpha1 apiVersion: theodolite.com/v1
kind: benchmark kind: benchmark
metadata: metadata:
name: uc1-kstreams name: uc1-kstreams
#name: "uc1-kstreams" spec:
appResource: appResource:
- "uc1-kstreams-deployment.yaml" - "uc1-kstreams-deployment.yaml"
- "aggregation-service.yaml" - "aggregation-service.yaml"
......
apiVersion: theodolite.com/v1alpha1 apiVersion: theodolite.com/v1
kind: execution kind: execution
metadata: metadata:
name: example-execution name: example-execution
#name: example-execution spec:
benchmark: "uc1-kstreams" benchmark: "uc1-kstreams"
load: load:
loadType: "NumSensors" loadType: "NumSensors"
......
package theodolite.benchmark package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import theodolite.util.LoadDimension import theodolite.util.LoadDimension
......
...@@ -2,8 +2,6 @@ package theodolite.benchmark ...@@ -2,8 +2,6 @@ package theodolite.benchmark
import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import kotlin.properties.Delegates import kotlin.properties.Delegates
...@@ -26,7 +24,7 @@ import kotlin.properties.Delegates ...@@ -26,7 +24,7 @@ import kotlin.properties.Delegates
*/ */
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class BenchmarkExecution : CustomResource(), Namespaced { class BenchmarkExecution : KubernetesResource {
var executionId: Int = 0 var executionId: Int = 0
lateinit var name: String lateinit var name: String
lateinit var benchmark: String lateinit var benchmark: String
...@@ -34,7 +32,7 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -34,7 +32,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var resources: ResourceDefinition lateinit var resources: ResourceDefinition
lateinit var slos: List<Slo> lateinit var slos: List<Slo>
lateinit var execution: Execution lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?> lateinit var configOverrides: MutableList<ConfigurationOverride?>
/** /**
* This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions] * This execution encapsulates the [strategy], the [duration], the [repetitions], and the [restrictions]
......
package theodolite.benchmark package theodolite.benchmark
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource import io.fabric8.kubernetes.client.CustomResource
...@@ -30,17 +31,19 @@ private var DEFAULT_NAMESPACE = "default" ...@@ -30,17 +31,19 @@ private var DEFAULT_NAMESPACE = "default"
* for the deserializing in the [theodolite.execution.operator.TheodoliteOperator]. * for the deserializing in the [theodolite.execution.operator.TheodoliteOperator].
* @constructor construct an empty Benchmark. * @constructor construct an empty Benchmark.
*/ */
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { class KubernetesBenchmark: KubernetesResource, Benchmark{
lateinit var name: String lateinit var name: String
lateinit var appResource: List<String> lateinit var appResource: List<String>
lateinit var loadGenResource: List<String> lateinit var loadGenResource: List<String>
lateinit var resourceTypes: List<TypeName> lateinit var resourceTypes: List<TypeName>
lateinit var loadTypes: List<TypeName> lateinit var loadTypes: List<TypeName>
lateinit var kafkaConfig: KafkaConfig lateinit var kafkaConfig: KafkaConfig
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE var namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" var path = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
/** /**
* Loads [KubernetesResource]s. * Loads [KubernetesResource]s.
* It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using * It first loads them via the [YamlParser] to check for their concrete type and afterwards initializes them using
......
...@@ -95,7 +95,6 @@ class TheodoliteExecutor( ...@@ -95,7 +95,6 @@ class TheodoliteExecutor(
return this.kubernetesBenchmark return this.kubernetesBenchmark
} }
/** /**
* Run all experiments which are specified in the corresponding * Run all experiments which are specified in the corresponding
* execution and benchmark objects. * execution and benchmark objects.
......
package theodolite.execution.operator
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.CustomResourceDoneable
import io.fabric8.kubernetes.client.CustomResourceList
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import java.lang.Thread.sleep
abstract class AbstractStateHandler<T,L,D>(
private val context: CustomResourceDefinitionContext,
private val client: KubernetesClient,
private val crd: Class<T>,
private val crdList: Class<L>,
private val donableCRD: Class<D>
): StateHandler where T : CustomResource, T: Namespaced, L: CustomResourceList<T>, D: CustomResourceDoneable<T> {
private val crdClient: MixedOperation<T, L, D, Resource<T, D>> =
this.client.customResources(this.context, this.crd, this.crdList, this.donableCRD)
@Synchronized
override fun setState(resourceName: String, f: (CustomResource) -> CustomResource?) {
this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.forEach { this.crdClient.updateStatus(it as T) }
}
@Synchronized
override fun getState(resourceName: String, f: (CustomResource) -> String?): String? {
return this.crdClient
.inNamespace(this.client.namespace)
.list().items
.filter { item -> item.metadata.name == resourceName }
.map { customResource -> f(customResource) }
.firstOrNull()
}
@Synchronized
override fun blockUntilStateIsSet(resourceName: String, desiredStatusString: String, f: (CustomResource) -> String?, maxTries: Int): Boolean {
for (i in 0.rangeTo(maxTries)) {
val currentStatus = getState(resourceName, f)
if(currentStatus == desiredStatusString) {
return true
}
sleep(50)
}
return false
}
}
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
* Handles adding, updating and deleting KubernetesBenchmarks.
*
* @param controller The TheodoliteController that handles the application state
*
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController) : ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to add
*
* @see KubernetesBenchmark
*/
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark
}
/**
* Update a KubernetesBenchmark.
*
* @param oldBenchmark the KubernetesBenchmark to update
* @param newBenchmark the updated KubernetesBenchmark
*
* @see KubernetesBenchmark
*/
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
onAdd(newBenchmark)
}
}
/**
* Delete a KubernetesBenchmark.
*
* @param benchmark the KubernetesBenchmark to delete
*
* @see KubernetesBenchmark
*/
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging
import org.json.JSONObject
import theodolite.execution.Shutdown
import theodolite.k8s.K8sContextFactory
import theodolite.model.crd.*
private val logger = KotlinLogging.logger {}
class ClusterSetup(
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>,
private val client: NamespacedKubernetesClient
) {
private val serviceMonitorContext = K8sContextFactory().create(
api = "v1",
scope = "Namespaced",
group = "monitoring.coreos.com",
plural = "servicemonitors"
)
fun clearClusterState(){
stopRunningExecution()
clearByLabel()
}
private fun stopRunningExecution() {
executionCRDClient
.inNamespace(client.namespace)
.list()
.items
.asSequence()
.filter { it.status.executionState == States.RUNNING.value }
.forEach { execution ->
val benchmark = benchmarkCRDClient
.inNamespace(client.namespace)
.list()
.items
.firstOrNull { it.metadata.name == execution.spec.benchmark }
if (benchmark != null) {
execution.spec.name = execution.metadata.name
benchmark.spec.name = benchmark.metadata.name
Shutdown(execution.spec, benchmark.spec).start()
} else {
logger.error {
"Execution with state ${States.RUNNING.value} was found, but no corresponding benchmark. " +
"Could not initialize cluster." }
}
}
}
private fun clearByLabel() {
this.client.services().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().deployments().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.apps().statefulSets().withLabel("app.kubernetes.io/created-by=theodolite").delete()
this.client.configMaps().withLabel("app.kubernetes.io/created-by=theodolite").delete()
val serviceMonitors = JSONObject(
this.client.customResource(serviceMonitorContext)
.list(client.namespace, mapOf(Pair("app.kubernetes.io/created-by", "theodolite")))
)
.getJSONArray("items")
(0 until serviceMonitors.length())
.map { serviceMonitors.getJSONObject(it).getJSONObject("metadata").getString("name") }
.forEach { this.client.customResource(serviceMonitorContext).delete(client.namespace, it) }
}
}
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import io.fabric8.kubernetes.client.informers.ResourceEventHandler import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.model.crd.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -14,17 +17,30 @@ private val logger = KotlinLogging.logger {} ...@@ -14,17 +17,30 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController * @see TheodoliteController
* @see BenchmarkExecution * @see BenchmarkExecution
*/ */
class ExecutionHandler(private val controller: TheodoliteController) : ResourceEventHandler<BenchmarkExecution> { class ExecutionHandler(
private val controller: TheodoliteController,
private val stateHandler: ExecutionStateHandler
) : ResourceEventHandler<ExecutionCRD> {
private val gson: Gson = GsonBuilder().enableComplexMapKeySerialization().create()
/** /**
* Add an execution to the end of the queue of the TheodoliteController. * Add an execution to the end of the queue of the TheodoliteController.
* *
* @param execution the execution to add * @param ExecutionCRD the execution to add
*/ */
override fun onAdd(execution: BenchmarkExecution) { @Synchronized
execution.name = execution.metadata.name override fun onAdd(execution: ExecutionCRD) {
logger.info { "Add new execution ${execution.metadata.name} to queue." } logger.info { "Add execution ${execution.metadata.name}" }
this.controller.executionsQueue.add(execution) execution.spec.name = execution.metadata.name
when (this.stateHandler.getExecutionState(execution.metadata.name)) {
null -> this.stateHandler.setExecutionState(execution.spec.name, States.PENDING)
States.RUNNING -> {
this.stateHandler.setExecutionState(execution.spec.name, States.RESTART)
if(this.controller.isExecutionRunning(execution.spec.name)){
this.controller.stop(restart=true)
}
}
}
} }
/** /**
...@@ -32,40 +48,39 @@ class ExecutionHandler(private val controller: TheodoliteController) : ResourceE ...@@ -32,40 +48,39 @@ class ExecutionHandler(private val controller: TheodoliteController) : ResourceE
* added to the beginning of the queue of the TheodoliteController. * added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue. * Otherwise, it is just added to the beginning of the queue.
* *
* @param oldExecution the old execution * @param oldExecutionCRD the old execution
* @param newExecution the new execution * @param newExecutionCRD the new execution
*/ */
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { @Synchronized
logger.info { "Add updated execution to queue." } override fun onUpdate(oldExecution: ExecutionCRD, newExecution: ExecutionCRD) {
newExecution.name = newExecution.metadata.name logger.info { "Receive update event for execution ${oldExecution.metadata.name}" }
try { newExecution.spec.name = newExecution.metadata.name
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } oldExecution.spec.name = oldExecution.metadata.name
} catch (e: NullPointerException) { if(gson.toJson(oldExecution.spec) != gson.toJson(newExecution.spec)) {
logger.warn { "No execution found for deletion" } when(this.stateHandler.getExecutionState(newExecution.metadata.name)) {
States.RUNNING -> {
this.stateHandler.setExecutionState(newExecution.spec.name, States.RESTART)
if (this.controller.isExecutionRunning(newExecution.spec.name)){
this.controller.stop(restart=true)
}
}
States.RESTART -> {} // should this set to pending?
else -> this.stateHandler.setExecutionState(newExecution.spec.name, States.PENDING)
} }
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} }
} }
/** /**
* Delete an execution from the queue of the TheodoliteController. * Delete an execution from the queue of the TheodoliteController.
* *
* @param execution the execution to delete * @param ExecutionCRD the execution to delete
*/ */
override fun onDelete(execution: BenchmarkExecution, b: Boolean) { @Synchronized
try { override fun onDelete(execution: ExecutionCRD, b: Boolean) {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } logger.info { "Delete execution ${execution.metadata.name}" }
logger.info { "Delete execution ${execution.metadata.name} from queue." } if(execution.status.executionState == States.RUNNING.value
} catch (e: NullPointerException) { && this.controller.isExecutionRunning(execution.spec.name)) {
logger.warn { "No execution found for deletion" } this.controller.stop()
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
} }
} }
} }
package theodolite.execution.operator
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import theodolite.model.crd.BenchmarkExecutionList
import theodolite.model.crd.ExecutionCRD
import theodolite.model.crd.States
import java.lang.Thread.sleep
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
class ExecutionStateHandler(context: CustomResourceDefinitionContext, val client: KubernetesClient):
AbstractStateHandler<ExecutionCRD, BenchmarkExecutionList, DoneableExecution>(
context = context,
client = client,
crd = ExecutionCRD::class.java,
crdList = BenchmarkExecutionList::class.java,
donableCRD = DoneableExecution::class.java) {
private var runExecutionDurationTimer: AtomicBoolean = AtomicBoolean(false)
private fun getExecutionLambda() = { cr: CustomResource ->
var execState = ""
if (cr is ExecutionCRD) { execState = cr.status.executionState }
execState
}
private fun getDurationLambda() = { cr: CustomResource ->
var execState = ""
if (cr is ExecutionCRD) { execState = cr.status.executionState }
execState
}
fun setExecutionState(resourceName: String, status: States): Boolean {
setState(resourceName) {cr -> if(cr is ExecutionCRD) cr.status.executionState = status.value; cr}
return blockUntilStateIsSet(resourceName, status.value, getExecutionLambda())
}
fun getExecutionState(resourceName: String) : States? {
val status = this.getState(resourceName, getExecutionLambda())
return States.values().firstOrNull { it.value == status }
}
fun setDurationState(resourceName: String, duration: Duration) {
setState(resourceName) { cr -> if (cr is ExecutionCRD) cr.status.executionDuration = durationToK8sString(duration); cr }
blockUntilStateIsSet(resourceName, durationToK8sString(duration), getDurationLambda())
}
fun getDurationState(resourceName: String): String? {
return this.getState(resourceName, getDurationLambda())
}
private fun durationToK8sString(duration: Duration): String {
val sec = duration.seconds
return when {
sec <= 120 -> "${sec}s" // max 120s
sec < 60 * 99 -> "${duration.toMinutes()}m" // max 99m
sec < 60 * 60 * 99 -> "${duration.toHours()}h" // max 99h
else -> "${duration.toDays()}d + ${duration.minusDays(duration.toDays()).toHours()}h"
}
}
fun startDurationStateTimer(resourceName: String) {
this.runExecutionDurationTimer.set(true)
val startTime = Instant.now().toEpochMilli()
Thread {
while (this.runExecutionDurationTimer.get()) {
val duration = Duration.ofMillis(Instant.now().minusMillis(startTime).toEpochMilli())
setDurationState(resourceName, duration)
sleep(100 * 1)
}
}.start()
}
@Synchronized
fun stopDurationStateTimer() {
this.runExecutionDurationTimer.set(false)
sleep(100 * 2)
}
}
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.CustomResource
private const val MAX_TRIES: Int = 5
interface StateHandler {
fun setState(resourceName: String, f: (CustomResource) -> CustomResource?)
fun getState(resourceName: String, f: (CustomResource) -> String?): String?
fun blockUntilStateIsSet(
resourceName: String,
desiredStatusString: String,
f: (CustomResource) -> String?,
maxTries: Int = MAX_TRIES): Boolean
}
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.execution.TheodoliteExecutor import theodolite.execution.TheodoliteExecutor
import theodolite.model.crd.*
import theodolite.util.ConfigurationOverride
import theodolite.util.PatcherDefinition
import java.lang.Thread.sleep import java.lang.Thread.sleep
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/** /**
* The controller implementation for Theodolite. * The controller implementation for Theodolite.
* *
* Maintains a Dequeue, based on ConcurrentLinkedDequeue, of executions to be executed for a benchmark.
*
* @param client The NamespacedKubernetesClient
* @param executionContext The CustomResourceDefinitionContext
*
* @see NamespacedKubernetesClient * @see NamespacedKubernetesClient
* @see CustomResourceDefinitionContext * @see CustomResourceDefinitionContext
* @see BenchmarkExecution * @see BenchmarkExecution
* @see KubernetesBenchmark * @see KubernetesBenchmark
* @see ConcurrentLinkedDeque * @see ConcurrentLinkedDeque
*/ */
class TheodoliteController( class TheodoliteController(
val client: NamespacedKubernetesClient, private val namespace: String,
val executionContext: CustomResourceDefinitionContext, val path: String,
val path: String private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, DoneableExecution, Resource<ExecutionCRD, DoneableExecution>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, DoneableBenchmark, Resource<BenchmarkCRD, DoneableBenchmark>>,
private val executionStateHandler: ExecutionStateHandler
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque()
val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap()
var isUpdated = AtomicBoolean(false)
/** /**
*
* Runs the TheodoliteController forever. * Runs the TheodoliteController forever.
*/ */
fun run() { fun run() {
sleep(5000) // wait until all states are correctly set
while (true) { while (true) {
try {
reconcile() reconcile()
logger.info { "Theodolite is waiting for new matching benchmark and execution." }
logger.info { "Currently available executions: " }
executionsQueue.forEach {
logger.info { "${it.name} : waiting for : ${it.benchmark}" }
}
logger.info { "Currently available benchmarks: " }
benchmarks.forEach {
logger.info { it.key }
}
sleep(2000) sleep(2000)
} catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e." }
}
} }
} }
/**
* Ensures that the application state corresponds to the defined KubernetesBenchmarks and BenchmarkExecutions.
*
* @see KubernetesBenchmark
* @see BenchmarkExecution
*/
@Synchronized
private fun reconcile() { private fun reconcile() {
while (executionsQueue.isNotEmpty()) { do {
val execution = executionsQueue.peek() val execution = getNextExecution()
val benchmark = benchmarks[execution.benchmark] if (execution != null) {
val benchmark = getBenchmarks()
if (benchmark == null) { .firstOrNull { it.name == execution.benchmark }
logger.debug { "No benchmark found for execution ${execution.name}." } if (benchmark != null) {
sleep(1000)
} else {
runExecution(execution, benchmark) runExecution(execution, benchmark)
} }
} else {
logger.info { "Could not find executable execution." }
} }
} while (execution != null)
} }
/** /**
* Execute a benchmark with a defined KubernetesBenchmark and BenchmarkExecution * Execute a benchmark with a defined KubernetesBenchmark and BenchmarkExecution
* *
* @see KubernetesBenchmark
* @see BenchmarkExecution * @see BenchmarkExecution
*/ */
@Synchronized private fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { setAdditionalLabels(execution.name,
isUpdated.set(false) "deployed-for-execution",
benchmark.path = path benchmark.appResource + benchmark.loadGenResource,
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." } execution)
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) setAdditionalLabels(benchmark.name,
executor.run() "deployed-for-benchmark",
benchmark.appResource + benchmark.loadGenResource,
execution)
setAdditionalLabels("theodolite",
"app.kubernetes.io/created-by",
benchmark.appResource + benchmark.loadGenResource,
execution)
executionStateHandler.setExecutionState(execution.name, States.RUNNING)
executionStateHandler.startDurationStateTimer(execution.name)
try { try {
if (!isUpdated.get()) { executor = TheodoliteExecutor(execution, benchmark)
this.executionsQueue.removeIf { e -> e.name == execution.name } executor.run()
client.customResource(executionContext).delete(client.namespace, execution.metadata.name) when (executionStateHandler.getExecutionState(execution.name)) {
States.RESTART -> runExecution(execution, benchmark)
States.RUNNING -> {
executionStateHandler.setExecutionState(execution.name, States.FINISHED)
logger.info { "Execution of ${execution.name} is finally stopped." }
}
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.warn { "Deletion skipped." } logger.error { "Failure while executing execution ${execution.name} with benchmark ${benchmark.name}." }
logger.error { "Problem is: $e" }
executionStateHandler.setExecutionState(execution.name, States.FAILURE)
}
executionStateHandler.stopDurationStateTimer()
} }
logger.info { "Execution of ${execution.name} is finally stopped." } @Synchronized
fun stop(restart: Boolean = false) {
if (!::executor.isInitialized) return
if (restart) {
executionStateHandler.setExecutionState(this.executor.getExecution().name, States.RESTART)
} else {
executionStateHandler.setExecutionState(this.executor.getExecution().name, States.INTERRUPTED)
logger.warn { "Execution ${executor.getExecution().name} unexpected interrupted" }
}
this.executor.executor.run.set(false)
} }
/** /**
* @return true if the TheodoliteExecutor of this controller is initialized. Else returns false. * @return all available [BenchmarkCRD]s
*/
private fun getBenchmarks(): List<KubernetesBenchmark> {
return this.benchmarkCRDClient
.inNamespace(namespace)
.list()
.items
.map { it.spec.name = it.metadata.name; it }
.map { it.spec.path = path; it }
.map { it.spec }
}
/**
* Get the [BenchmarkExecution] for the next run. Which [BenchmarkExecution]
* is selected for the next execution depends on three points:
*
* 1. Only executions are considered for which a matching benchmark is available on the cluster
* 2. The Status of the execution must be [States.PENDING] or [States.RESTART]
* 3. Of the remaining [BenchmarkCRD], those with status [States.RESTART] are preferred,
* then, if there is more than one, the oldest execution is chosen.
* *
* @see TheodoliteExecutor * @return the next execution or null
*/ */
@Synchronized private fun getNextExecution(): BenchmarkExecution? {
fun isInitialized(): Boolean { val availableBenchmarkNames = getBenchmarks()
return ::executor.isInitialized .map { it.name }
return executionCRDClient
.inNamespace(namespace)
.list()
.items
.asSequence()
.map { it.spec.name = it.metadata.name; it }
.filter {
it.status.executionState == States.PENDING.value ||
it.status.executionState == States.RESTART.value
}
.filter { availableBenchmarkNames.contains(it.spec.benchmark) }
.sortedWith(stateComparator().thenBy { it.metadata.creationTimestamp })
.map { it.spec }
.firstOrNull()
}
/**
* Simple comparator which can be used to order a list of [ExecutionCRD] such that executions with
* status [States.RESTART] are before all other executions.
*/
private fun stateComparator() = Comparator<ExecutionCRD> { a, b ->
when {
(a == null && b == null) -> 0
(a.status.executionState == States.RESTART.value) -> -1
else -> 1
}
}
fun isExecutionRunning(executionName: String): Boolean {
return this.executor.getExecution().name == executionName
}
private fun setAdditionalLabels(
labelValue: String,
labelName: String,
resources: List<String>,
execution: BenchmarkExecution
) {
val additionalConfigOverrides = mutableListOf<ConfigurationOverride>()
resources.forEach {
run {
val configurationOverride = ConfigurationOverride()
configurationOverride.patcher = PatcherDefinition()
configurationOverride.patcher.type = "LabelPatcher"
configurationOverride.patcher.variableName = labelName
configurationOverride.patcher.resource = it
configurationOverride.value = labelValue
additionalConfigOverrides.add(configurationOverride)
}
}
execution.configOverrides.addAll(additionalConfigOverrides)
} }
} }
\ No newline at end of file
package theodolite.execution.operator package theodolite.execution.operator
import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource
import io.fabric8.kubernetes.internal.KubernetesDeserializer import io.fabric8.kubernetes.internal.KubernetesDeserializer
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.BenchmarkExecutionList
import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.KubernetesBenchmarkList
import theodolite.k8s.K8sContextFactory import theodolite.k8s.K8sContextFactory
import theodolite.model.crd.*
private const val DEFAULT_NAMESPACE = "default" private const val DEFAULT_NAMESPACE = "default"
...@@ -16,7 +15,7 @@ private const val EXECUTION_SINGULAR = "execution" ...@@ -16,7 +15,7 @@ private const val EXECUTION_SINGULAR = "execution"
private const val EXECUTION_PLURAL = "executions" private const val EXECUTION_PLURAL = "executions"
private const val BENCHMARK_SINGULAR = "benchmark" private const val BENCHMARK_SINGULAR = "benchmark"
private const val BENCHMARK_PLURAL = "benchmarks" private const val BENCHMARK_PLURAL = "benchmarks"
private const val API_VERSION = "v1alpha1" private const val API_VERSION = "v1"
private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong() private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong()
private const val GROUP = "theodolite.com" private const val GROUP = "theodolite.com"
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -33,42 +32,82 @@ class TheodoliteOperator { ...@@ -33,42 +32,82 @@ class TheodoliteOperator {
* Start the operator. * Start the operator.
*/ */
fun start() { fun start() {
// FIXME("Remove all benchmark state handling")
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
val client = DefaultKubernetesClient().inNamespace(namespace) val client = DefaultKubernetesClient().inNamespace(namespace)
client.use {
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION", "$GROUP/$API_VERSION",
EXECUTION_SINGULAR, EXECUTION_SINGULAR,
BenchmarkExecution::class.java ExecutionCRD::class.java
) )
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION", "$GROUP/$API_VERSION",
BENCHMARK_SINGULAR, BENCHMARK_SINGULAR,
KubernetesBenchmark::class.java BenchmarkCRD::class.java
) )
val contextFactory = K8sContextFactory() val contextFactory = K8sContextFactory()
val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL) val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL)
val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL) val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL)
val executionCRDClient: MixedOperation<
ExecutionCRD,
BenchmarkExecutionList,
DoneableExecution,
Resource<ExecutionCRD, DoneableExecution>>
= client.customResources(
executionContext,
ExecutionCRD::class.java,
BenchmarkExecutionList::class.java,
DoneableExecution::class.java)
val benchmarkCRDClient: MixedOperation<
BenchmarkCRD,
KubernetesBenchmarkList,
DoneableBenchmark,
Resource<BenchmarkCRD, DoneableBenchmark>>
= client.customResources(
benchmarkContext,
BenchmarkCRD::class.java,
KubernetesBenchmarkList::class.java,
DoneableBenchmark::class.java)
val executionStateHandler = ExecutionStateHandler(
context = executionContext,
client = client)
val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config" val appResource = System.getenv("THEODOLITE_APP_RESOURCES") ?: "./config"
val controller = TheodoliteController(client = client, executionContext = executionContext, path = appResource) val controller =
TheodoliteController(
namespace = client.namespace,
path = appResource,
benchmarkCRDClient = benchmarkCRDClient,
executionCRDClient = executionCRDClient,
executionStateHandler = executionStateHandler)
val informerFactory = client.informers() val informerFactory = client.informers()
val informerExecution = informerFactory.sharedIndexInformerForCustomResource( val informerExecution = informerFactory.sharedIndexInformerForCustomResource(
executionContext, BenchmarkExecution::class.java, executionContext,
BenchmarkExecutionList::class.java, RESYNC_PERIOD ExecutionCRD::class.java,
) BenchmarkExecutionList::class.java,
val informerBenchmark = informerFactory.sharedIndexInformerForCustomResource( RESYNC_PERIOD
benchmarkContext, KubernetesBenchmark::class.java,
KubernetesBenchmarkList::class.java, RESYNC_PERIOD
) )
informerExecution.addEventHandler(ExecutionHandler(controller)) informerExecution.addEventHandler(ExecutionHandler(
informerBenchmark.addEventHandler(BenchmarkEventHandler(controller)) controller = controller,
informerFactory.startAllRegisteredInformers() stateHandler = executionStateHandler))
ClusterSetup(
executionCRDClient = executionCRDClient,
benchmarkCRDClient = benchmarkCRDClient,
client = client
).clearClusterState()
informerFactory.startAllRegisteredInformers()
controller.run() controller.run()
}
} }
} }
...@@ -53,4 +53,9 @@ class ServiceMonitorWrapper(private val serviceMonitor: Map<String, String>) : C ...@@ -53,4 +53,9 @@ class ServiceMonitorWrapper(private val serviceMonitor: Map<String, String>) : C
val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String> val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String>
return smAsMap["name"]!! return smAsMap["name"]!!
} }
fun getLabels(): Map<String, String>{
val smAsMap = this.serviceMonitor["metadata"]!! as Map<String, String>
return smAsMap["labels"]!! as Map<String, String>
}
} }
package theodolite.model.crd
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import theodolite.benchmark.KubernetesBenchmark
@JsonDeserialize
class BenchmarkCRD(
var spec: KubernetesBenchmark = KubernetesBenchmark()
) : CustomResource(), Namespaced
\ No newline at end of file
package theodolite.benchmark package theodolite.model.crd
import io.fabric8.kubernetes.client.CustomResourceList import io.fabric8.kubernetes.client.CustomResourceList
class BenchmarkExecutionList : CustomResourceList<BenchmarkExecution>() class BenchmarkExecutionList : CustomResourceList<ExecutionCRD>()
\ No newline at end of file
package theodolite.model.crd
import io.fabric8.kubernetes.api.builder.Function
import io.fabric8.kubernetes.client.CustomResourceDoneable
class DoneableBenchmark(resource: BenchmarkCRD, function: Function<BenchmarkCRD, BenchmarkCRD>) :
CustomResourceDoneable<BenchmarkCRD>(resource, function)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment