Skip to content
Snippets Groups Projects
Commit e3add5b9 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Enhance printing of TopicManager.kt

parent fb0525c5
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!121Introduce hot fixes from the presentation branch,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -2,7 +2,6 @@ package theodolite.k8s ...@@ -2,7 +2,6 @@ package theodolite.k8s
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import java.util.* import java.util.*
...@@ -21,7 +20,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -21,7 +20,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
fun createTopics(newTopics: Collection<NewTopic>) { fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.createTopics(newTopics) val result = kafkaAdmin.createTopics(newTopics)
logger.info { "Topics created finished with result: ${result.all().get()}" } result.all().get()// wait for the future object
logger.info {
"Topics created finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
kafkaAdmin.close() kafkaAdmin.close()
} }
...@@ -32,15 +37,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -32,15 +37,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
try { try {
val result = kafkaAdmin.deleteTopics(topics) val result = kafkaAdmin.deleteTopics(topics)
logger.info { "Topics deletion finished with result: ${result.all().get()}" } result.all().get() // wait for the future object
logger.info {
"\"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
} catch (e: Exception) { } catch (e: Exception) {
logger.error { "Error while removing topics: $e" } logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
} }
kafkaAdmin.close() kafkaAdmin.close()
logger.info { "Topics removed" }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment