diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 3d73dd67ddcb5363e752d9a0a65d5a8bff98b4e9..a9a7a4e385b73bde61014e75da3397a2bc9950e5 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment( */ override fun teardown() { KafkaLagExporterRemover(client).remove(LABEL) - kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) resources.forEach { kubernetesManager.remove(it) } + kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt index efbfe4df41d70b1b35ea91667c8e0c85d8b58953..1ab8e215216cd6f2d3640ca113d438e4ed821042 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt @@ -29,7 +29,7 @@ class BenchmarkExecutorImpl( try { benchmarkDeployment.setup() this.waitAndLog() - } catch(e: Exception) { + } catch (e: Exception) { logger.error { "Error while setup experiment." } logger.error { "Error is: $e" } this.run.set(false) @@ -40,10 +40,20 @@ class BenchmarkExecutorImpl( */ if (this.run.get()) { result = - AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration) + AnalysisExecutor(slo = slo, executionId = executionId).analyze( + load = load, + res = res, + executionDuration = executionDuration + ) this.results.setResult(Pair(load, res), result) } - benchmarkDeployment.teardown() + + try { + benchmarkDeployment.teardown() + } catch (e: Exception) { + logger.warn { "Error while teardown of deplyoment" } + logger.debug { "The Teardowm failed cause of: $e " } + } return result } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 43dc697255ec86dd39640df7d10b89676e5d1d67..321fa9a945fade012a81ea4dbacb2c403e783411 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -2,9 +2,12 @@ package theodolite.k8s import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.CreateTopicsResult import org.apache.kafka.clients.admin.NewTopic +import java.lang.Thread.sleep private val logger = KotlinLogging.logger {} +private const val RETRY_TIME = 2000L /** * Manages the topics related tasks @@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {} * @constructor Creates a KafkaAdminClient */ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { - /** * Creates topics. * @param newTopics List of all Topic that should be created */ fun createTopics(newTopics: Collection<NewTopic>) { - var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - val result = kafkaAdmin.createTopics(newTopics) - result.all().get()// wait for the future object + val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) + lateinit var result: CreateTopicsResult + + do { + var retryCreation = false + try { + result = kafkaAdmin.createTopics(newTopics) + result.all().get()// wait for the future object + + } catch (e: Exception) { + logger.warn { "Error during topic creation." } + logger.warn { "Will retry the topic creation after 2 seconds" } + sleep(RETRY_TIME) + retryCreation = true + } + } while (retryCreation) + logger.info { - "Topics created finished with result: ${ + "Topics creation finished with result: ${ result.values().map { it -> it.key + ": " + it.value.isDone } .joinToString(separator = ",") } " @@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { * @param topics List of names with the topics to remove. */ fun removeTopics(topics: List<String>) { - var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - try { - val result = kafkaAdmin.deleteTopics(topics) - result.all().get() // wait for the future object - logger.info { - "Topics deletion finished with result: ${ - result.values().map { it -> it.key + ": " + it.value.isDone } - .joinToString(separator = ",") - } " + val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) + var deleted = false + + while (!deleted) { + try { + val result = kafkaAdmin.deleteTopics(topics) + result.all().get() // wait for the future object + logger.info { + "Topics deletion finished with result: ${ + result.values().map { it -> it.key + ": " + it.value.isDone } + .joinToString(separator = ",") + }" + } + } catch (e: Exception) { + logger.error { "Error while removing topics: $e" } + logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } + } + + val toDelete = topics.filter { topic -> + kafkaAdmin.listTopics().names().get().contains(topic) + } + + if (toDelete.isNullOrEmpty()) { + deleted = true + } else { + logger.info { "Deletion of kafka topics failed retrying in 2 seconds" } + sleep(RETRY_TIME) } - } catch (e: Exception) { - logger.error { "Error while removing topics: $e" } - logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." } } kafkaAdmin.close() }