From 41b4c8ff4035c6d3ea31da1765b693ffa5977b1b Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Thu, 15 Apr 2021 21:52:54 +0200
Subject: [PATCH] Fix TopicManager.kt

Add additional try catch to BenchmarkExecutorImpl.kt
---
 .../KubernetesBenchmarkDeployment.kt          |  2 +-
 .../execution/BenchmarkExecutorImpl.kt        | 16 ++++-
 .../kotlin/theodolite/k8s/TopicManager.kt     | 65 ++++++++++++++-----
 3 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
index 3d73dd67d..a9a7a4e38 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
@@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment(
      */
     override fun teardown() {
         KafkaLagExporterRemover(client).remove(LABEL)
-        kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
         resources.forEach {
             kubernetesManager.remove(it)
         }
+        kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
     }
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
index efbfe4df4..1ab8e2152 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/execution/BenchmarkExecutorImpl.kt
@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl(
         try {
             benchmarkDeployment.setup()
             this.waitAndLog()
-        } catch(e: Exception) {
+        } catch (e: Exception) {
             logger.error { "Error while setup experiment." }
             logger.error { "Error is: $e" }
             this.run.set(false)
@@ -40,10 +40,20 @@ class BenchmarkExecutorImpl(
          */
         if (this.run.get()) {
             result =
-                AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration)
+                AnalysisExecutor(slo = slo, executionId = executionId).analyze(
+                    load = load,
+                    res = res,
+                    executionDuration = executionDuration
+                )
             this.results.setResult(Pair(load, res), result)
         }
-        benchmarkDeployment.teardown()
+
+        try {
+            benchmarkDeployment.teardown()
+        } catch (e: Exception) {
+            logger.warn { "Error while teardown of deplyoment" }
+            logger.debug { "The Teardowm failed cause of: $e " }
+        }
 
         return result
     }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
index 43dc69725..321fa9a94 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
@@ -2,9 +2,12 @@ package theodolite.k8s
 
 import mu.KotlinLogging
 import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.admin.CreateTopicsResult
 import org.apache.kafka.clients.admin.NewTopic
+import java.lang.Thread.sleep
 
 private val logger = KotlinLogging.logger {}
+private const val RETRY_TIME = 2000L
 
 /**
  * Manages the topics related tasks
@@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {}
  * @constructor Creates a KafkaAdminClient
  */
 class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
-
     /**
      * Creates topics.
      * @param newTopics List of all Topic that should be created
      */
     fun createTopics(newTopics: Collection<NewTopic>) {
-        var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
-        val result = kafkaAdmin.createTopics(newTopics)
-        result.all().get()// wait for the future object
+        val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
+        lateinit var result: CreateTopicsResult
+
+        do {
+            var retryCreation = false
+            try {
+                result = kafkaAdmin.createTopics(newTopics)
+                result.all().get()// wait for the future object
+
+            } catch (e: Exception) {
+                logger.warn { "Error during topic creation." }
+                logger.warn { "Will retry the topic creation after 2 seconds" }
+                sleep(RETRY_TIME)
+                retryCreation = true
+            }
+        } while (retryCreation)
+
         logger.info {
-            "Topics created finished with result: ${
+            "Topics creation finished with result: ${
                 result.values().map { it -> it.key + ": " + it.value.isDone }
                     .joinToString(separator = ",")
             } "
@@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
      * @param topics List of names with the topics to remove.
      */
     fun removeTopics(topics: List<String>) {
-        var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
-        try {
-            val result = kafkaAdmin.deleteTopics(topics)
-            result.all().get() // wait for the future object
-            logger.info {
-                "Topics deletion finished with result: ${
-                    result.values().map { it -> it.key + ": " + it.value.isDone }
-                        .joinToString(separator = ",")
-                } "
+        val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
+        var deleted = false
+        
+        while (!deleted) {
+            try {
+                val result = kafkaAdmin.deleteTopics(topics)
+                result.all().get() // wait for the future object
+                logger.info {
+                    "Topics deletion finished with result: ${
+                        result.values().map { it -> it.key + ": " + it.value.isDone }
+                            .joinToString(separator = ",")
+                    }"
+                }
+            } catch (e: Exception) {
+                logger.error { "Error while removing topics: $e" }
+                logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
+            }
+
+            val toDelete = topics.filter { topic ->
+                kafkaAdmin.listTopics().names().get().contains(topic)
+            }
+
+            if (toDelete.isNullOrEmpty()) {
+                deleted = true
+            } else {
+                logger.info { "Deletion of kafka topics failed retrying in 2 seconds" }
+                sleep(RETRY_TIME)
             }
-        } catch (e: Exception) {
-            logger.error { "Error while removing topics: $e" }
-            logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
         }
         kafkaAdmin.close()
     }
-- 
GitLab