diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt index 0e94dc8c5c743c341541b5925924ed3910d3870b..72895a42c7e52b6e15cdff01b2805af130e441aa 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/BenchmarkExecution.kt @@ -20,10 +20,6 @@ class BenchmarkExecution : CustomResource(), Namespaced { lateinit var execution: Execution lateinit var configOverrides: List<ConfigurationOverride?> - fun stop() { - throw InterruptedException() - } - @JsonDeserialize class Execution : KubernetesResource { lateinit var strategy: String diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutor.kt index 86b6e9416ed34c93218cc40a7d8fdc0867c871d1..32124ab355d9103143c15294b29d78a2e51d9f4c 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutor.kt @@ -40,7 +40,6 @@ abstract class BenchmarkExecutor( fun stop() { run = false - //throw InterruptedException() } /** diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index 57dd1c77b56641a69c31f5ac6b5008e52fc04ffa..fa435df36d01d37131b3d233bd40a921d34a6f6d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -28,11 +28,9 @@ class BenchmarkExecutorImpl( if (this.run) { result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration) - - benchmarkDeployment.teardown() - this.results.setResult(Pair(load, res), result) } + benchmarkDeployment.teardown() return result } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/Shutdown.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/Shutdown.kt index fe4b983bca71dd5801f5a421cf44583f085cec90..589dee1ac2e39d107794df1e3aa85ea56d6db92b 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/Shutdown.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/Shutdown.kt @@ -8,10 +8,9 @@ import theodolite.util.Resource private val logger = KotlinLogging.logger {} -class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) : - Thread() { +class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) { - override fun run() { + fun run() { // Build Configuration to teardown logger.info { "Received shutdown signal -> Shutting down" } val deployment = @@ -24,4 +23,4 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b deployment.teardown() logger.info { "Teardown completed" } } -} +} \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt index ec5f7763788f2fd37f2430be6f526f0cc2f3fa5e..66ee841899e8c62679b0c35aed714652fce5e051 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteController.kt @@ -22,7 +22,7 @@ class TheodoliteController( val executionContext: CustomResourceDefinitionContext ) { lateinit var executor: TheodoliteExecutor - val executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>() + var executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>() val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap() /** @@ -38,23 +38,20 @@ class TheodoliteController( } override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { - logger.info { "Update execution ${oldExecution.metadata.name}" } + logger.info { "Add updated execution to queue" } if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) { - logger.info { "restart current benchmark with new version" } - executor.stop() - sleep(2000) - startBenchmark(execution = newExecution, benchmark = executor.getBenchmark()) - } else { - onDelete(oldExecution, false) - onAdd(newExecution) + executor.executor.run = false } + newExecution.name = newExecution.metadata.name + executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } + executionsQueue.add(newExecution) } override fun onDelete(execution: BenchmarkExecution, b: Boolean) { logger.info { "Delete execution ${execution.metadata.name} from queue" } executionsQueue.removeIf { e -> e.name == execution.metadata.name } if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) { - executor.stop() + executor.executor.run = false logger.info { "Current benchmark stopped" } } } @@ -69,12 +66,12 @@ class TheodoliteController( override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) { logger.info { "Update benchmark ${newBenchmark.metadata.name}" } + newBenchmark.name = newBenchmark.metadata.name if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) { - - logger.info { "restart current benchmark with new version" } - executor.stop() - sleep(2000) - startBenchmark(execution = executor.getExecution(), benchmark = newBenchmark) + executor.executor.run = false + val execution = executor.getExecution() + execution.name = execution.name + System.currentTimeMillis() + executionsQueue.add(execution) } else { onAdd(newBenchmark) } @@ -84,7 +81,7 @@ class TheodoliteController( logger.info { "Delete benchmark ${benchmark.metadata.name}" } benchmarks.remove(benchmark.metadata.name) if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) { - executor.stop() + executor.executor.run = false logger.info { "Current benchmark stopped" } } } @@ -95,9 +92,9 @@ class TheodoliteController( while (true) { try { reconcile() + logger.info { "Theodolite is waiting for new jobs" } + sleep(2000) if (this::executor.isInitialized && !executor.isRunning) { - logger.info { "Theodolite is waiting for new jobs" } - sleep(1000) } } catch (e: InterruptedException) { logger.error { "Execution interrupted with error: $e" } @@ -108,8 +105,7 @@ class TheodoliteController( @Synchronized private fun reconcile() { while (executionsQueue.isNotEmpty() - && ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized) - ) { + && ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)) { val execution = executionsQueue.peek() val benchmark = benchmarks[execution.benchmark] @@ -117,28 +113,19 @@ class TheodoliteController( logger.debug { "No benchmark found for execution ${execution.benchmark}" } sleep(1000) } else { - startBenchmark(execution, benchmark) + runExecution(execution, benchmark) } } } - - fun startBenchmark(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { - + @Synchronized + fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" } executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) executor.run() - // wait until execution is deleted - try { + + if (executionsQueue.contains(execution)) { client.customResource(executionContext).delete(client.namespace, execution.metadata.name) - sleep(1000) - while (executionsQueue.contains(execution)) { - sleep(2000) - logger.info { "Delete of execution: ${execution.name} failed. Retrying in 2 second." } - client.customResource(executionContext).delete(client.namespace, execution.metadata.name) - } - } catch (e: Exception) { - logger.error { "Error while delete current execution: $e" } } logger.info { "Execution of ${execution.name} is finally stopped" } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt index 2ed356a797d4f90d09168bbc5cee4ad1b928464a..b840bcdea4499f38de91b810928e33e09419ef3f 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteExecutor.kt @@ -18,19 +18,6 @@ class TheodoliteExecutor( private var config: BenchmarkExecution, private var kubernetesBenchmark: KubernetesBenchmark ) { - private val executionThread = Thread() { - if (config == null || kubernetesBenchmark == null) { - logger.error { "Execution or Benchmark not found" } - } else { - val config = buildConfig() - // execute benchmarks for each load - for (load in config.loads) { - config.compositeStrategy.findSuitableResource(load, config.resources) - } - } - isRunning = false - } - var isRunning = false lateinit var executor: BenchmarkExecutor @@ -91,29 +78,16 @@ class TheodoliteExecutor( fun run() { isRunning = true logger.info { "Start thread" } - // executionThread.start() - if (config == null || kubernetesBenchmark == null) { - logger.error { "Execution or Benchmark not found" } - } else { - val config = buildConfig() - // execute benchmarks for each load - for (load in config.loads) { - if (isRunning) { - config.compositeStrategy.findSuitableResource(load, config.resources) - } + + val config = buildConfig() + // execute benchmarks for each load + for (load in config.loads) { + if (isRunning) { + config.compositeStrategy.findSuitableResource(load, config.resources) } - logger.info { "Stop Thread" } - isRunning = false } - } - - fun stop() { + logger.info { "Stop Thread" } isRunning = false - try { - executor.stop() - Shutdown(config, kubernetesBenchmark).run() - } catch (e: InterruptedException) { - logger.warn { "Execution stopped" } - } + } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt index 490d37c3297908bc657fcfb96f98fe8a036a6ea7..6293868a5c5a2a29b8e13bdad7fbbe19c10402d4 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/TheodoliteYamlExecutor.kt @@ -5,6 +5,7 @@ import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.KubernetesBenchmark import theodolite.util.YamlParser +import kotlin.concurrent.thread import kotlin.system.exitProcess private val logger = KotlinLogging.logger {} @@ -23,12 +24,12 @@ object TheodoliteYamlExecutor { parser.parse("./../../../resources/main/yaml/BenchmarkType.yaml", KubernetesBenchmark::class.java)!! val shutdown = Shutdown(benchmarkExecution, benchmark) - Runtime.getRuntime().addShutdownHook(shutdown) + Runtime.getRuntime().addShutdownHook(thread { shutdown.run()}) val executor = TheodoliteExecutor(benchmarkExecution, benchmark) executor.run() logger.info { "Theodolite finished" } - Runtime.getRuntime().removeShutdownHook(shutdown) + Runtime.getRuntime().removeShutdownHook(thread { shutdown.run()}) exitProcess(0) } } diff --git a/theodolite-quarkus/src/main/resources/operator/example-execution.yaml b/theodolite-quarkus/src/main/resources/operator/example-execution.yaml index 063711c1a326f24bdc4e86fb97d99a07ece959c1..ef625dfe6ec78c2cc0ed099dfee0f767d57263bb 100644 --- a/theodolite-quarkus/src/main/resources/operator/example-execution.yaml +++ b/theodolite-quarkus/src/main/resources/operator/example-execution.yaml @@ -25,4 +25,13 @@ execution: restrictions: - "LowerBound" configOverrides: - - \ No newline at end of file + - patcher: + type: "NodeSelectorPatcher" + resource: "uc1-load-generator-deployment.yaml" + variableName: "env" + value: "prod" + - patcher: + type: "NodeSelectorPatcher" + resource: "uc1-kstreams-deployment.yaml" + variableName: "env" + value: "prod" \ No newline at end of file