diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 1367f625808059f9e1a56f8be40c14e6f70e356d..ef5715e248ae6c64df0035a94d57fea12202787e 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.CreateTopicsResult import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.errors.TopicExistsException import java.lang.Thread.sleep private val logger = KotlinLogging.logger {} @@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { try { result = kafkaAdmin.createTopics(newTopics) result.all().get() // wait for the future to be completed - - } catch (e: Exception) { + } catch (e: Exception) { // TopicExistsException logger.warn(e) { "Error during topic creation." } - logger.debug { e } // TODO remove? + logger.debug { e } // TODO remove due to attached exception to warn log? logger.info { "Remove existing topics." } delete(newTopics.map { topic -> topic.name() }, kafkaAdmin) - logger.info { "Will retry the topic creation in $RETRY_TIME seconds." } + logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." } sleep(RETRY_TIME) retryCreation = true } @@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { logger.info { "Topics creation finished with result: ${ - result.values().map { it -> it.key + ": " + it.value.isDone } + result + .values() + .map { it -> it.key + ": " + it.value.isDone } .joinToString(separator = ",") } " } @@ -57,7 +59,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val currentTopics = kafkaAdmin.listTopics().names().get() - delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin) + delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin) kafkaAdmin.close() } @@ -70,7 +72,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { */ private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { for (t in topics) { - val regex = t.toRegex() + val regex = t.toRegex() if (regex.matches(existingTopic)) { return true } @@ -93,17 +95,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { } } catch (e: Exception) { logger.error(e) { "Error while removing topics: $e" } - logger.info { "Existing topics are: ${kafkaAdmin.listTopics()}." } + logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." } } - val toDelete = topics.filter { topic -> - kafkaAdmin.listTopics().names().get().contains(topic) - } + val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) } if (toDelete.isNullOrEmpty()) { deleted = true } else { - logger.info { "Deletion of kafka topics failed, will retry in $RETRY_TIME seconds." } + logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." } sleep(RETRY_TIME) } }