Skip to content
Snippets Groups Projects
Commit fdf759d5 authored by Sören Henning's avatar Sören Henning
Browse files

Apply minor style fixes

parent ce33558e
No related branches found
No related tags found
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
......@@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.errors.TopicExistsException
import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {}
......@@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
try {
result = kafkaAdmin.createTopics(newTopics)
result.all().get() // wait for the future to be completed
} catch (e: Exception) {
} catch (e: Exception) { // TopicExistsException
logger.warn(e) { "Error during topic creation." }
logger.debug { e } // TODO remove?
logger.debug { e } // TODO remove due to attached exception to warn log?
logger.info { "Remove existing topics." }
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.info { "Will retry the topic creation in $RETRY_TIME seconds." }
logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME)
retryCreation = true
}
......@@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
logger.info {
"Topics creation finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
result
.values()
.map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
......@@ -57,7 +59,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin)
kafkaAdmin.close()
}
......@@ -70,7 +72,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
......@@ -93,17 +95,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
}
} catch (e: Exception) {
logger.error(e) { "Error while removing topics: $e" }
logger.info { "Existing topics are: ${kafkaAdmin.listTopics()}." }
logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." }
}
val toDelete = topics.filter { topic ->
kafkaAdmin.listTopics().names().get().contains(topic)
}
val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) }
if (toDelete.isNullOrEmpty()) {
deleted = true
} else {
logger.info { "Deletion of kafka topics failed, will retry in $RETRY_TIME seconds." }
logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment