From a0f00c91d118527a1f9faf218d05489f68d49868 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Fri, 16 Apr 2021 14:09:37 +0200 Subject: [PATCH] Rewrite TopicManager.kt Enhance log by reusing same kafkaAdmin client --- .../src/main/kotlin/theodolite/k8s/TopicManager.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index a956f8218..e82a133b3 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -30,7 +30,7 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { result.all().get()// wait for the future object } catch (e: Exception) { - removeTopics(newTopics.map { topic -> topic.name() }) + delete(newTopics.map { topic -> topic.name() }, kafkaAdmin) logger.warn { "Error during topic creation." } logger.debug { e } logger.warn { "Will retry the topic creation after 2 seconds" } @@ -54,6 +54,11 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { */ fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) + delete(topics, kafkaAdmin) + kafkaAdmin.close() + } + + private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { var deleted = false while (!deleted) { @@ -82,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { sleep(RETRY_TIME) } } - kafkaAdmin.close() } + } -- GitLab