Skip to content
Snippets Groups Projects
Commit 2706779f authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'hotfix-topic-manager' into 'theodolite-kotlin'

Hotfix TopicManager again

See merge request !128
parents d6e116ec a0f00c91
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!128Hotfix TopicManager again,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2807 canceled
...@@ -30,7 +30,9 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -30,7 +30,9 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
result.all().get()// wait for the future object result.all().get()// wait for the future object
} catch (e: Exception) { } catch (e: Exception) {
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.warn { "Error during topic creation." } logger.warn { "Error during topic creation." }
logger.debug { e }
logger.warn { "Will retry the topic creation after 2 seconds" } logger.warn { "Will retry the topic creation after 2 seconds" }
sleep(RETRY_TIME) sleep(RETRY_TIME)
retryCreation = true retryCreation = true
...@@ -52,8 +54,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -52,8 +54,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin)
kafkaAdmin.close()
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false var deleted = false
while (!deleted) { while (!deleted) {
try { try {
val result = kafkaAdmin.deleteTopics(topics) val result = kafkaAdmin.deleteTopics(topics)
...@@ -80,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -80,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
sleep(RETRY_TIME) sleep(RETRY_TIME)
} }
} }
kafkaAdmin.close()
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment