From 70bdf48b7823164fbe8970d3621f48bba207f8a3 Mon Sep 17 00:00:00 2001
From: "stu126940@mail.uni-kiel.de" <stu126940@mail.uni-kiel.de>
Date: Mon, 19 Apr 2021 11:59:43 +0200
Subject: [PATCH] Allow to delete all topics that have a specific prefix.

---
 .../example-benchmark-yaml-resource.yaml      |  4 ++-
 .../benchmark/KubernetesBenchmark.kt          |  2 +-
 .../KubernetesBenchmarkDeployment.kt          |  9 ++++--
 .../kotlin/theodolite/k8s/TopicManager.kt     | 20 ++++++++++++-
 .../kotlin/theodolite/util/KafkaConfig.kt     | 30 +++++++++++++------
 .../example-benchmark-k8s-resource.yaml       |  4 ++-
 6 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml
index d507934fb..a5f0b16ae 100644
--- a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml
+++ b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml
@@ -26,4 +26,6 @@ kafkaConfig:
   topics:
     - name: "input"
       numPartitions: 40
-      replicationFactor: 1
\ No newline at end of file
+      replicationFactor: 1
+    - name: "theodolite"
+      removeOnly: True
\ No newline at end of file
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
index 71b65a28f..bbcb8a957 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt
@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
             namespace = namespace,
             resources = resources.map { r -> r.second },
             kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
-            topics = kafkaConfig.getKafkaTopics(),
+            topics = kafkaConfig.topics,
             client = DefaultKubernetesClient().inNamespace(namespace)
         )
     }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
index 0de00fb50..08fdd840d 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt
@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
 import org.apache.kafka.clients.admin.NewTopic
 import theodolite.k8s.K8sManager
 import theodolite.k8s.TopicManager
+import theodolite.util.KafkaConfig
 
 /**
  * Organizes the deployment of benchmarks in Kubernetes.
@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment(
     val namespace: String,
     val resources: List<KubernetesResource>,
     private val kafkaConfig: HashMap<String, Any>,
-    private val topics: Collection<NewTopic>,
+    private val topics: List<KafkaConfig.TopicWrapper>,
     private val client: NamespacedKubernetesClient
 ) : BenchmarkDeployment {
     private val kafkaController = TopicManager(this.kafkaConfig)
@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment(
      *  - Deploy the needed resources.
      */
     override fun setup() {
-        kafkaController.createTopics(this.topics)
+        val kafkaTopics = this.topics.filter { !it.removeOnly }
+            .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
+        kafkaController.createTopics(kafkaTopics)
         resources.forEach {
             kubernetesManager.deploy(it)
         }
@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment(
         resources.forEach {
             kubernetesManager.remove(it)
         }
-        kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
+        kafkaController.removeTopics(this.topics.map { topic -> topic.name })
         KafkaLagExporterRemover(client).remove(LABEL)
     }
 }
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
index 90c2cb8a9..d58b06fa2 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt
@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
      */
     fun removeTopics(topics: List<String>) {
         val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
-        delete(topics, kafkaAdmin)
+        val currentTopics = kafkaAdmin.listTopics().names().get()
+        delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
         kafkaAdmin.close()
     }
 
+    /**
+     * This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
+     *
+     * @param existingTopic string for which should be checked if it could be created
+     * @param topics list of string which are used as possible prefixes to create `existingTopic`
+     * @return true, `existingTopics` matches a created regex, else false
+     */
+    private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
+        for (t in topics) {
+            val regex = ("$t.*").toRegex()
+            if (regex.matches(existingTopic)) {
+                return true
+            }
+        }
+        return false
+    }
+
     private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
         var deleted = false
 
diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt
index 4ba096e37..f304d8cd0 100644
--- a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt
+++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import io.quarkus.runtime.annotations.RegisterForReflection
 import org.apache.kafka.clients.admin.NewTopic
 import kotlin.properties.Delegates
+import kotlin.reflect.KProperty
 
 /**
  * Configuration of Kafka connection.
@@ -23,15 +24,6 @@ class KafkaConfig {
      */
     lateinit var topics: List<TopicWrapper>
 
-    /**
-     * Get all current Kafka topics.
-     *
-     * @return the list of topics.
-     */
-    fun getKafkaTopics(): List<NewTopic> {
-        return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) }
-    }
-
     /**
      * Wrapper for a topic definition.
      */
@@ -51,5 +43,25 @@ class KafkaConfig {
          * The replication factor of this topic
          */
         var replicationFactor by Delegates.notNull<Short>()
+
+        /**
+         * If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix.
+         */
+        var removeOnly by DelegatesFalse()
     }
 }
+
+/**
+ * Delegates to initialize a lateinit boolean to false
+ */
+@RegisterForReflection
+class DelegatesFalse {
+    private var state = false
+    operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
+        return state
+    }
+    operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
+        state = value
+    }
+
+}
diff --git a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml
index 9fc16f92e..122daec9b 100644
--- a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml
+++ b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml
@@ -26,4 +26,6 @@ kafkaConfig:
   topics:
     - name: "input"
       numPartitions: 40
-      replicationFactor: 1
\ No newline at end of file
+      replicationFactor: 1
+    - name: "theodolite"
+      removeOnly: True
\ No newline at end of file
-- 
GitLab