Skip to content
Snippets Groups Projects
Commit 670cf574 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix Operator logic

parent 100d41b6
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!106Introduce a Theodolite operator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -22,8 +22,9 @@ class TheodoliteController( ...@@ -22,8 +22,9 @@ class TheodoliteController(
val executionContext: CustomResourceDefinitionContext val executionContext: CustomResourceDefinitionContext
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
var executionsQueue: Queue<BenchmarkExecution> = LinkedList<BenchmarkExecution>() val executionsQueue: Deque<BenchmarkExecution> = LinkedList<BenchmarkExecution>()
val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap() val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap()
var isUpdated = false
/** /**
* Adds the EventHandler to kubernetes * Adds the EventHandler to kubernetes
...@@ -39,18 +40,22 @@ class TheodoliteController( ...@@ -39,18 +40,22 @@ class TheodoliteController(
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue" } logger.info { "Add updated execution to queue" }
newExecution.name = newExecution.metadata.name
executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
executionsQueue.addFirst(newExecution)
if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) { if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) {
isUpdated = true
executor.executor.run = false executor.executor.run = false
} }
newExecution.name = newExecution.metadata.name
executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
executionsQueue.add(newExecution)
} }
override fun onDelete(execution: BenchmarkExecution, b: Boolean) { override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue" } logger.info { "Delete execution ${execution.metadata.name} from queue" }
executionsQueue.removeIf { e -> e.name == execution.metadata.name } executionsQueue.removeIf { e -> e.name == execution.metadata.name }
if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) { if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) {
isUpdated = true
executor.executor.run = false executor.executor.run = false
logger.info { "Current benchmark stopped" } logger.info { "Current benchmark stopped" }
} }
...@@ -68,10 +73,8 @@ class TheodoliteController( ...@@ -68,10 +73,8 @@ class TheodoliteController(
logger.info { "Update benchmark ${newBenchmark.metadata.name}" } logger.info { "Update benchmark ${newBenchmark.metadata.name}" }
newBenchmark.name = newBenchmark.metadata.name newBenchmark.name = newBenchmark.metadata.name
if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) { if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) {
isUpdated = true
executor.executor.run = false executor.executor.run = false
val execution = executor.getExecution()
execution.name = execution.name + System.currentTimeMillis()
executionsQueue.add(execution)
} else { } else {
onAdd(newBenchmark) onAdd(newBenchmark)
} }
...@@ -81,6 +84,7 @@ class TheodoliteController( ...@@ -81,6 +84,7 @@ class TheodoliteController(
logger.info { "Delete benchmark ${benchmark.metadata.name}" } logger.info { "Delete benchmark ${benchmark.metadata.name}" }
benchmarks.remove(benchmark.metadata.name) benchmarks.remove(benchmark.metadata.name)
if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) { if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) {
isUpdated = true
executor.executor.run = false executor.executor.run = false
logger.info { "Current benchmark stopped" } logger.info { "Current benchmark stopped" }
} }
...@@ -94,8 +98,6 @@ class TheodoliteController( ...@@ -94,8 +98,6 @@ class TheodoliteController(
reconcile() reconcile()
logger.info { "Theodolite is waiting for new jobs" } logger.info { "Theodolite is waiting for new jobs" }
sleep(2000) sleep(2000)
if (this::executor.isInitialized && !executor.isRunning) {
}
} catch (e: InterruptedException) { } catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e" } logger.error { "Execution interrupted with error: $e" }
} }
...@@ -104,13 +106,14 @@ class TheodoliteController( ...@@ -104,13 +106,14 @@ class TheodoliteController(
@Synchronized @Synchronized
private fun reconcile() { private fun reconcile() {
while (executionsQueue.isNotEmpty() while (executionsQueue.isNotEmpty()) {
&& ((this::executor.isInitialized && !executor.isRunning) || !this::executor.isInitialized)) {
val execution = executionsQueue.peek() val execution = executionsQueue.peek()
val benchmark = benchmarks[execution.benchmark] val benchmark = benchmarks[execution.benchmark]
if (benchmark == null) { if (benchmark == null) {
logger.debug { "No benchmark found for execution ${execution.benchmark}" } logger.debug { "No benchmark found for execution ${execution.name}" }
sleep(1000) sleep(1000)
} else { } else {
runExecution(execution, benchmark) runExecution(execution, benchmark)
...@@ -120,13 +123,15 @@ class TheodoliteController( ...@@ -120,13 +123,15 @@ class TheodoliteController(
@Synchronized @Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
isUpdated = false
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" } logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" }
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run() executor.run()
if (executionsQueue.contains(execution)) { if (!isUpdated) {
client.customResource(executionContext).delete(client.namespace, execution.metadata.name) client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
} }
logger.info { "Execution of ${execution.name} is finally stopped" } logger.info { "Execution of ${execution.name} is finally stopped" }
} }
} }
package theodolite.execution package theodolite.execution
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.patcher.PatcherDefinitionFactory import theodolite.patcher.PatcherDefinitionFactory
...@@ -12,13 +11,10 @@ import theodolite.util.Resource ...@@ -12,13 +11,10 @@ import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.time.Duration import java.time.Duration
private val logger = KotlinLogging.logger {}
class TheodoliteExecutor( class TheodoliteExecutor(
private var config: BenchmarkExecution, private var config: BenchmarkExecution,
private var kubernetesBenchmark: KubernetesBenchmark private var kubernetesBenchmark: KubernetesBenchmark
) { ) {
var isRunning = false
lateinit var executor: BenchmarkExecutor lateinit var executor: BenchmarkExecutor
private fun buildConfig(): Config { private fun buildConfig(): Config {
...@@ -76,18 +72,12 @@ class TheodoliteExecutor( ...@@ -76,18 +72,12 @@ class TheodoliteExecutor(
} }
fun run() { fun run() {
isRunning = true
logger.info { "Start thread" }
val config = buildConfig() val config = buildConfig()
// execute benchmarks for each load // execute benchmarks for each load
for (load in config.loads) { for (load in config.loads) {
if (isRunning) { if (executor.run) {
config.compositeStrategy.findSuitableResource(load, config.resources) config.compositeStrategy.findSuitableResource(load, config.resources)
} }
} }
logger.info { "Stop Thread" }
isRunning = false
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment