From 93cd7ab33d4003e58bcc9e28d3a03f0e18375e07 Mon Sep 17 00:00:00 2001
From: "stu126940@mail.uni-kiel.de" <stu126940@mail.uni-kiel.de>
Date: Wed, 24 Mar 2021 19:45:43 +0100
Subject: [PATCH] Enhance logic of Theodolite operator

Fix confusing naming
Make CR namespaced
Remove execution after deletion

There is currently a small problem, since we can stop the a running benchmark (therefore we have to kill the corresponding thread)
---
 .../benchmark/BenchmarkExecution.kt           |  3 +-
 .../benchmark/KubernetesBenchmark.kt          |  3 +-
 .../execution/TheodoliteController.kt         | 70 ++++++++++++-------
 .../execution/TheodoliteExecutor.kt           |  8 +--
 .../execution/TheodoliteOperator.kt           | 32 ++++-----
 .../resources/operator/example-benchmark.yaml |  4 +-
 .../resources/operator/example-execution.yaml |  3 +-
 .../resources/yaml/BenchmarkExecution.yaml    |  3 +-
 8 files changed, 68 insertions(+), 58 deletions(-)

diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt
index 417d5cea2..fe5b723a5 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt
@@ -2,12 +2,13 @@ package theodolite.benchmark
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import io.fabric8.kubernetes.api.model.KubernetesResource
+import io.fabric8.kubernetes.api.model.Namespaced
 import io.fabric8.kubernetes.client.CustomResource
 import theodolite.util.ConfigurationOverride
 import kotlin.properties.Delegates
 
 @JsonDeserialize
-class BenchmarkExecution : CustomResource(){
+class BenchmarkExecution : CustomResource(), Namespaced {
     lateinit var name: String
     lateinit var benchmark: String
     lateinit var load: LoadDefinition
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
index 61224e61f..7da65427f 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
@@ -1,6 +1,7 @@
 package theodolite.benchmark
 
 import io.fabric8.kubernetes.api.model.KubernetesResource
+import io.fabric8.kubernetes.api.model.Namespaced
 import io.fabric8.kubernetes.client.CustomResource
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import mu.KotlinLogging
@@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {}
 private var DEFAULT_NAMESPACE = "default"
 
 
-class KubernetesBenchmark : Benchmark , CustomResource() {
+class KubernetesBenchmark : Benchmark , CustomResource(), Namespaced {
     lateinit var name: String
     lateinit var appResource: List<String>
     lateinit var loadGenResource: List<String>
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt
index bb85e92bd..a4acec6c5 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt
@@ -1,11 +1,13 @@
 package theodolite.execution
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
 import io.fabric8.kubernetes.client.informers.ResourceEventHandler
 import io.fabric8.kubernetes.client.informers.SharedInformer
 import mu.KotlinLogging
 import theodolite.benchmark.BenchmarkExecution
 import theodolite.benchmark.KubernetesBenchmark
+import java.lang.Thread.sleep
 import java.util.Queue
 import java.util.LinkedList
 
@@ -15,7 +17,8 @@ private val logger = KotlinLogging.logger {}
 class TheodoliteController(
     val client: NamespacedKubernetesClient,
     val informerBenchmarkExecution: SharedInformer<BenchmarkExecution>,
-    val informerBenchmarkType: SharedInformer<KubernetesBenchmark>
+    val informerBenchmarkType: SharedInformer<KubernetesBenchmark>,
+    val executionContext: CustomResourceDefinitionContext
 ) {
     lateinit var executor: TheodoliteExecutor
     val self = this
@@ -26,50 +29,62 @@ class TheodoliteController(
      * Adds the EventHandler to kubernetes
      */
     fun create() {
-
         informerBenchmarkExecution.addEventHandler(object : ResourceEventHandler<BenchmarkExecution> {
-            override fun onAdd(benchmarkExecution: BenchmarkExecution) {
-                executionsQueue.add(benchmarkExecution)
+            override fun onAdd(execution: BenchmarkExecution) {
+                logger.info { "Add new execution ${execution.metadata.name} to queue" }
+                execution.name = execution.metadata.name
+                executionsQueue.add(execution)
+
             }
 
             override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
-                if (executor.getExecution().name == newExecution.name) {
+                logger.info { "Update execution ${oldExecution.metadata.name}" }
+                if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) {
+                    logger.info { "restart current benchmark with new version" }
                     executor.stop()
                     executor = TheodoliteExecutor(config = newExecution, kubernetesBenchmark = executor.getBenchmark())
                     executor.run()
                 } else {
-                    executionsQueue.remove(oldExecution)
+                    onDelete(oldExecution, false)
                     onAdd(newExecution)
                 }
             }
 
             override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
-                if (executor.getExecution().name == execution.name) {
+                logger.info { "Delete execution ${execution.metadata.name} from queue" }
+                executionsQueue.removeIf{e -> e.name == execution.metadata.name}
+                if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) {
                     executor.stop()
-                } else {
-                    executionsQueue.remove(execution)
+                    logger.info { "Current benchmark stopped" }
                 }
             }
         })
 
         informerBenchmarkType.addEventHandler(object : ResourceEventHandler<KubernetesBenchmark> {
-            override fun onAdd(kubernetesBenchmark: KubernetesBenchmark) {
-                benchmarks[kubernetesBenchmark.name] = kubernetesBenchmark
+            override fun onAdd(benchmark: KubernetesBenchmark) {
+                logger.info { "Add new benchmark ${benchmark.name}" }
+                benchmark.name = benchmark.metadata.name
+                benchmarks[benchmark.name] = benchmark
             }
 
             override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
-                onAdd(newBenchmark)
-                if (executor.getBenchmark().name == oldBenchmark.name) {
+                logger.info { "Update benchmark ${newBenchmark.metadata.name}" }
+                if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) {
+                    logger.info { "restart current benchmark with new version" }
                     executor.stop()
                     executor = TheodoliteExecutor(config = executor.getExecution(), kubernetesBenchmark = newBenchmark)
                     executor.run()
+                } else {
+                    onAdd(newBenchmark)
                 }
             }
 
             override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
-                benchmarks.remove(benchmark.name)
-                if(executor.getBenchmark().name == benchmark.name) {
+                logger.info { "Delete benchmark ${benchmark.metadata.name}" }
+                benchmarks.remove(benchmark.metadata.name)
+                if(::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) {
                     executor.stop()
+                    logger.info { "Current benchmark stopped" }
                 }
             }
         })
@@ -80,26 +95,29 @@ class TheodoliteController(
             try {
                 reconcile()
             } catch (e: InterruptedException) {
-                logger.error { "$e" }
+                logger.error { "Execution interrupted with error: $e" }
             }
         }
     }
-
     @Synchronized
     private fun reconcile() {
-        while(executionsQueue.isNotEmpty()) {
-            val execution = executionsQueue.poll()
-            val benchmark = benchmarks[execution.name]
+        while(executionsQueue.isNotEmpty()
+            && ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)) {
+            val execution = executionsQueue.peek()
+            val benchmark = benchmarks[execution.benchmark]
+
             if (benchmark == null) {
-                logger.error { "No benchmark found for execution ${execution.name}" }
-                executionsQueue.add(execution)
+                logger.debug { "No benchmark found for execution ${execution.benchmark}" }
             } else {
-                if ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized) {
-                    executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
-                    executor.run()
+                logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" }
+                executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
+                executor.run()
+                // wait until executions is deleted
+                client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
+                while (executionsQueue.contains(execution)) {
+                    sleep(1000)
                 }
             }
         }
     }
-
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
index 6fe54d383..1ad396493 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt
@@ -11,17 +11,14 @@ import theodolite.util.LoadDimension
 import theodolite.util.Resource
 import theodolite.util.Results
 import java.time.Duration
-import kotlin.system.exitProcess
-
 
 private val logger = KotlinLogging.logger {}
 
-
 class TheodoliteExecutor(
     private var config: BenchmarkExecution,
     private var kubernetesBenchmark: KubernetesBenchmark
 ) {
-    private val executionThread = Thread {
+    private val executionThread = Thread() {
         if(config == null || kubernetesBenchmark == null) {
             logger.error { "Execution or Benchmark not found" }
         } else {
@@ -93,11 +90,12 @@ class TheodoliteExecutor(
     fun run() {
         isRunning = true
         logger.info { "Start thread" }
-        executionThread.run()
+        executionThread.start()
         logger.info { "Stop Thread" }
     }
 
     fun stop() {
+        // TODO call shutdown hook
         isRunning = false
         executionThread.interrupt()
     }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt
index acbd34c04..5b621c3c4 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteOperator.kt
@@ -12,15 +12,14 @@ private var DEFAULT_NAMESPACE = "default"
 private val logger = KotlinLogging.logger {}
 
 @QuarkusMain(name = "TheodoliteOperator")
-object TheodoliteCRDExecutor {
+object TheodoliteOperator {
     @JvmStatic
     fun main(args: Array<String>) {
 
         val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
         logger.info { "Using $namespace as namespace." }
 
-        val client = DefaultKubernetesClient().inNamespace("default")
-
+        val client = DefaultKubernetesClient().inNamespace(namespace)
 
         KubernetesDeserializer.registerCustomKind(
             "theodolite.com/v1alpha1",
@@ -34,14 +33,14 @@ object TheodoliteCRDExecutor {
             KubernetesBenchmark::class.java
         )
 
-        val ExececutionContext = CustomResourceDefinitionContext.Builder()
+        val executionContext = CustomResourceDefinitionContext.Builder()
             .withVersion("v1alpha1")
             .withScope("Namespaced")
             .withGroup("theodolite.com")
             .withPlural("executions")
             .build()
 
-        val TypeContext = CustomResourceDefinitionContext.Builder()
+        val typeContext = CustomResourceDefinitionContext.Builder()
             .withVersion("v1alpha1")
             .withScope("Namespaced")
             .withGroup("theodolite.com")
@@ -50,25 +49,22 @@ object TheodoliteCRDExecutor {
 
         val informerFactory = client.informers()
 
+        val  informerBenchmarkExecution = informerFactory.sharedIndexInformerForCustomResource(
+            executionContext, BenchmarkExecution::class.java,
+            BenchmarkExecutionList::class.java, 10 * 60 * 1000.toLong()
+        )
 
-        val informerBenchmarkExecution = informerFactory.sharedIndexInformerForCustomResource(ExececutionContext, BenchmarkExecution::class.java,
-            BenchmarkExecutionList::class.java,10 * 60 * 1000.toLong())
-
-        val informerBenchmarkType = informerFactory.sharedIndexInformerForCustomResource(TypeContext, KubernetesBenchmark::class.java,
-            KubernetesBenchmarkList::class.java,10 * 60 * 1000.toLong())
-
-
-
+       val informerBenchmarkType = informerFactory.sharedIndexInformerForCustomResource(
+            typeContext, KubernetesBenchmark::class.java,
+            KubernetesBenchmarkList::class.java, 10 * 60 * 1000.toLong()
+        )
         val controller = TheodoliteController(client = client,
             informerBenchmarkExecution = informerBenchmarkExecution,
-            informerBenchmarkType = informerBenchmarkType)
+            informerBenchmarkType = informerBenchmarkType,
+            executionContext = executionContext)
 
         controller.create()
-
         informerFactory.startAllRegisteredInformers()
-
         controller.run()
-
-        //exitProcess(0)
     }
 }
diff --git a/theodolite-quarkus/src/main/resources/operator/example-benchmark.yaml b/theodolite-quarkus/src/main/resources/operator/example-benchmark.yaml
index ca69be31c..9fc16f92e 100644
--- a/theodolite-quarkus/src/main/resources/operator/example-benchmark.yaml
+++ b/theodolite-quarkus/src/main/resources/operator/example-benchmark.yaml
@@ -1,13 +1,11 @@
 apiVersion: theodolite.com/v1alpha1
 kind: benchmark
 metadata:
-  name: theodolite-example-benchmark
-name: "theodolite ist cool"
+  name: uc1-kstreams
 appResource:
   - "uc1-kstreams-deployment.yaml"
   - "aggregation-service.yaml"
   - "jmx-configmap.yaml"
-  - "uc1-service-monitor.yaml"
 loadGenResource:
   - "uc1-load-generator-deployment.yaml"
   - "uc1-load-generator-service.yaml"
diff --git a/theodolite-quarkus/src/main/resources/operator/example-execution.yaml b/theodolite-quarkus/src/main/resources/operator/example-execution.yaml
index 1eb49cc9a..063711c1a 100644
--- a/theodolite-quarkus/src/main/resources/operator/example-execution.yaml
+++ b/theodolite-quarkus/src/main/resources/operator/example-execution.yaml
@@ -2,8 +2,7 @@ apiVersion: theodolite.com/v1alpha1
 kind: execution
 metadata:
   name: theodolite-example-execution
-name: "Theodolite Test Context"
-benchmark: "benchmarkType"
+benchmark: "uc1-kstreams"
 load:
   loadType: "NumSensors"
   loadValues:
diff --git a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
index 99c4ea236..5a723bbb1 100644
--- a/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
+++ b/theodolite-quarkus/src/main/resources/yaml/BenchmarkExecution.yaml
@@ -1,8 +1,7 @@
 apiVersion: demo.k8s.io/v1alpha1
 kind: Benchmarkexecutions
 metadata:
-  name: example-webserver
-name: "Theodolite Test Context"
+  name: example-execution
 benchmark: "uc1-kstreams"
 load:
   loadType: "NumSensors"
-- 
GitLab