Skip to content
Snippets Groups Projects

Hotfix TopicManager again

@@ -30,7 +30,9 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
result.all().get()// wait for the future object
} catch (e: Exception) {
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.warn { "Error during topic creation." }
logger.debug { e }
logger.warn { "Will retry the topic creation after 2 seconds" }
sleep(RETRY_TIME)
retryCreation = true
@@ -52,8 +54,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin)
kafkaAdmin.close()
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false
while (!deleted) {
try {
val result = kafkaAdmin.deleteTopics(topics)
@@ -80,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
sleep(RETRY_TIME)
}
}
kafkaAdmin.close()
}
}
Loading