Skip to content
Snippets Groups Projects
Commit 82f7e2d6 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' of...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/theodolite into theodolite-kotlin
parents 81b74b72 2706779f
No related branches found
No related tags found
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2809 canceled
...@@ -30,7 +30,9 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -30,7 +30,9 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
result.all().get()// wait for the future object result.all().get()// wait for the future object
} catch (e: Exception) { } catch (e: Exception) {
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.warn { "Error during topic creation." } logger.warn { "Error during topic creation." }
logger.debug { e }
logger.warn { "Will retry the topic creation after 2 seconds" } logger.warn { "Will retry the topic creation after 2 seconds" }
sleep(RETRY_TIME) sleep(RETRY_TIME)
retryCreation = true retryCreation = true
...@@ -52,8 +54,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -52,8 +54,13 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin)
kafkaAdmin.close()
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false var deleted = false
while (!deleted) { while (!deleted) {
try { try {
val result = kafkaAdmin.deleteTopics(topics) val result = kafkaAdmin.deleteTopics(topics)
...@@ -80,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -80,6 +87,6 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
sleep(RETRY_TIME) sleep(RETRY_TIME)
} }
} }
kafkaAdmin.close()
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment