diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt index 66ee841899e8c62679b0c35aed714652fce5e051..ef1b19e62cc90850b6abdd6cf25ea5da86175ded 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt @@ -22,8 +22,9 @@ class TheodoliteController( val executionContext: CustomResourceDefinitionContext ) { lateinit var executor: TheodoliteExecutor - var executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>() + val executionsQueue: Deque<BenchmarkExecution> = LinkedList<BenchmarkExecution>() val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap() + var isUpdated = false /** * Adds the EventHandler to kubernetes @@ -39,18 +40,22 @@ class TheodoliteController( override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { logger.info { "Add updated execution to queue" } + + newExecution.name = newExecution.metadata.name + executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } + executionsQueue.addFirst(newExecution) + if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) { + isUpdated = true executor.executor.run = false } - newExecution.name = newExecution.metadata.name - executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } - executionsQueue.add(newExecution) } override fun onDelete(execution: BenchmarkExecution, b: Boolean) { logger.info { "Delete execution ${execution.metadata.name} from queue" } executionsQueue.removeIf { e -> e.name == execution.metadata.name } if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) { + isUpdated = true executor.executor.run = false logger.info { "Current benchmark stopped" } } @@ -68,10 +73,8 @@ class TheodoliteController( logger.info { "Update benchmark ${newBenchmark.metadata.name}" } newBenchmark.name = newBenchmark.metadata.name if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) { + isUpdated = true executor.executor.run = false - val execution = executor.getExecution() - execution.name = execution.name + System.currentTimeMillis() - executionsQueue.add(execution) } else { onAdd(newBenchmark) } @@ -81,6 +84,7 @@ class TheodoliteController( logger.info { "Delete benchmark ${benchmark.metadata.name}" } benchmarks.remove(benchmark.metadata.name) if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) { + isUpdated = true executor.executor.run = false logger.info { "Current benchmark stopped" } } @@ -94,8 +98,6 @@ class TheodoliteController( reconcile() logger.info { "Theodolite is waiting for new jobs" } sleep(2000) - if (this::executor.isInitialized && !executor.isRunning) { - } } catch (e: InterruptedException) { logger.error { "Execution interrupted with error: $e" } } @@ -104,13 +106,14 @@ class TheodoliteController( @Synchronized private fun reconcile() { - while (executionsQueue.isNotEmpty() - && ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)) { + while (executionsQueue.isNotEmpty()) { + val execution = executionsQueue.peek() + val benchmark = benchmarks[execution.benchmark] if (benchmark == null) { - logger.debug { "No benchmark found for execution ${execution.benchmark}" } + logger.debug { "No benchmark found for execution ${execution.name}" } sleep(1000) } else { runExecution(execution, benchmark) @@ -120,13 +123,15 @@ class TheodoliteController( @Synchronized fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { + isUpdated = false logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" } executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) executor.run() - if (executionsQueue.contains(execution)) { + if (!isUpdated) { client.customResource(executionContext).delete(client.namespace, execution.metadata.name) } + logger.info { "Execution of ${execution.name} is finally stopped" } } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt index b840bcdea4499f38de91b810928e33e09419ef3f..fd78a18a889d833447f4e459336171c7d45b2436 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt @@ -1,6 +1,5 @@ package theodolite.execution -import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark import theodolite.patcher.PatcherDefinitionFactory @@ -12,13 +11,10 @@ import theodolite.util.Resource import theodolite.util.Results import java.time.Duration -private val logger = KotlinLogging.logger {} - class TheodoliteExecutor( private var config: BenchmarkExecution, private var kubernetesBenchmark: KubernetesBenchmark ) { - var isRunning = false lateinit var executor: BenchmarkExecutor private fun buildConfig(): Config { @@ -76,18 +72,12 @@ class TheodoliteExecutor( } fun run() { - isRunning = true - logger.info { "Start thread" } - val config = buildConfig() // execute benchmarks for each load for (load in config.loads) { - if (isRunning) { + if (executor.run) { config.compositeStrategy.findSuitableResource(load, config.resources) } } - logger.info { "Stop Thread" } - isRunning = false - } }