diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt index 1752ac112ea84ea179e238f7ab8d808779014d1b..0152cd7bef808d4652cd893fb282e0cb8b18dd5a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/ExecutionEventHandler.kt @@ -3,6 +3,7 @@ package theodolite.execution.operator import io.fabric8.kubernetes.client.informers.ResourceEventHandler import mu.KotlinLogging import theodolite.benchmark.BenchmarkExecution +import java.lang.NullPointerException private val logger = KotlinLogging.logger {} @@ -16,17 +17,25 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { logger.info { "Add updated execution to queue." } newExecution.name = newExecution.metadata.name - this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } + try { + this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } + } catch(e: NullPointerException) { + logger.warn { "No execution found for deletion" } + } this.controller.executionsQueue.addFirst(newExecution) - if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) { + if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) { this.controller.isUpdated.set(true) this.controller.executor.executor.run.compareAndSet(true, false) } } override fun onDelete(execution: BenchmarkExecution, b: Boolean) { - logger.info { "Delete execution ${execution.metadata.name} from queue." } - this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } + try { + this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } + logger.info { "Delete execution ${execution.metadata.name} from queue." } + } catch(e: NullPointerException) { + logger.warn { "No execution found for deletion" } + } if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) { this.controller.isUpdated.set(true) this.controller.executor.executor.run.compareAndSet(true, false) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt index 9e6280cf3c8160ef686fd6dcee45276de7e67fa7..9f6cd64528874a1dc5f20c6d6c1563b1aa9f003d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/operator/TheodoliteController.kt @@ -70,9 +70,15 @@ class TheodoliteController( executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) executor.run() - if (!isUpdated.get()) { - client.customResource(executionContext).delete(client.namespace, execution.metadata.name) + try { + if (!isUpdated.get()) { + this.executionsQueue.removeIf { e -> e.name == execution.name } + client.customResource(executionContext).delete(client.namespace, execution.metadata.name) + } + } catch (e: Exception) { + logger.warn { "Deletion skipped." } } + logger.info { "Execution of ${execution.name} is finally stopped." } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 390974cd247645197ebe6044bf785710164155aa..7529a45310f219fc0c5248b4031a020227a86049 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -2,7 +2,6 @@ package theodolite.k8s import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.clients.admin.NewTopic import java.util.* @@ -20,9 +19,15 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { */ fun createTopics(newTopics: Collection<NewTopic>) { var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - kafkaAdmin.createTopics(newTopics) + val result = kafkaAdmin.createTopics(newTopics) + result.all().get()// wait for the future object + logger.info { + "Topics created finished with result: ${ + result.values().map { it -> it.key + ": " + it.value.isDone } + .joinToString(separator = ",") + } " + } kafkaAdmin.close() - logger.info { "Topics created" } } @@ -32,15 +37,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { */ fun removeTopics(topics: List<String>) { var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - val result = kafkaAdmin.deleteTopics(topics) - try { - result.all().get() + val result = kafkaAdmin.deleteTopics(topics) + result.all().get() // wait for the future object + logger.info { + "\"Topics deletion finished with result: ${ + result.values().map { it -> it.key + ": " + it.value.isDone } + .joinToString(separator = ",") + } " + } } catch (e: Exception) { - logger.error { "Error while removing topics: $e" } - logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } + logger.error { "Error while removing topics: $e" } + logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } } kafkaAdmin.close() - logger.info { "Topics removed" } } }