diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 28a7b12e8b2203496e870fbd67641cd854bbbe0d..a6bf881d6ded7b0936b400a37b572c77c95bb241 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -30,7 +30,7 @@ class KubernetesBenchmarkDeployment( private val kafkaController = TopicManager(this.kafkaConfig) private val kubernetesManager = K8sManager(client) private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter" - private val SLEEP_AFTER_K8S_DELETION_MS = 2000L + private val SLEEP_AFTER_TEARDOWN = 5000L /** * Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]: @@ -41,9 +41,7 @@ class KubernetesBenchmarkDeployment( val kafkaTopics = this.topics.filter { !it.removeOnly } .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } kafkaController.createTopics(kafkaTopics) - resources.forEach { - kubernetesManager.deploy(it) - } + resources.forEach { kubernetesManager.deploy(it) } } /** @@ -56,9 +54,9 @@ class KubernetesBenchmarkDeployment( resources.forEach { kubernetesManager.remove(it) } - logger.info { "Kubernetes resources deleted. Allow for short pause before continuing." } - Thread.sleep(SLEEP_AFTER_K8S_DELETION_MS) kafkaController.removeTopics(this.topics.map { topic -> topic.name }) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) + logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." } + Thread.sleep(SLEEP_AFTER_TEARDOWN) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 1367f625808059f9e1a56f8be40c14e6f70e356d..ef5715e248ae6c64df0035a94d57fea12202787e 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.CreateTopicsResult import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.errors.TopicExistsException import java.lang.Thread.sleep private val logger = KotlinLogging.logger {} @@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { try { result = kafkaAdmin.createTopics(newTopics) result.all().get() // wait for the future to be completed - - } catch (e: Exception) { + } catch (e: Exception) { // TopicExistsException logger.warn(e) { "Error during topic creation." } - logger.debug { e } // TODO remove? + logger.debug { e } // TODO remove due to attached exception to warn log? logger.info { "Remove existing topics." } delete(newTopics.map { topic -> topic.name() }, kafkaAdmin) - logger.info { "Will retry the topic creation in $RETRY_TIME seconds." } + logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." } sleep(RETRY_TIME) retryCreation = true } @@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { logger.info { "Topics creation finished with result: ${ - result.values().map { it -> it.key + ": " + it.value.isDone } + result + .values() + .map { it -> it.key + ": " + it.value.isDone } .joinToString(separator = ",") } " } @@ -57,7 +59,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val currentTopics = kafkaAdmin.listTopics().names().get() - delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin) + delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin) kafkaAdmin.close() } @@ -70,7 +72,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { */ private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { for (t in topics) { - val regex = t.toRegex() + val regex = t.toRegex() if (regex.matches(existingTopic)) { return true } @@ -93,17 +95,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { } } catch (e: Exception) { logger.error(e) { "Error while removing topics: $e" } - logger.info { "Existing topics are: ${kafkaAdmin.listTopics()}." } + logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." } } - val toDelete = topics.filter { topic -> - kafkaAdmin.listTopics().names().get().contains(topic) - } + val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) } if (toDelete.isNullOrEmpty()) { deleted = true } else { - logger.info { "Deletion of kafka topics failed, will retry in $RETRY_TIME seconds." } + logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." } sleep(RETRY_TIME) } }