diff --git a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml index d507934fb87127748a1f6e22177f9dd9430f5f0b..a5f0b16ae6e3b3335911836fe3d58e817004e23b 100644 --- a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml +++ b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml @@ -26,4 +26,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 71b65a28fd074e4554c283ee94a8db028d652d46..bbcb8a957fb2f04ca678b231a878be0a23d46748 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { namespace = namespace, resources = resources.map { r -> r.second }, kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), - topics = kafkaConfig.getKafkaTopics(), + topics = kafkaConfig.topics, client = DefaultKubernetesClient().inNamespace(namespace) ) } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 0de00fb5007b646240661310d037e34ef1ea1ae2..08fdd840dc120f34ab6d303b2013928719c9483c 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection import org.apache.kafka.clients.admin.NewTopic import theodolite.k8s.K8sManager import theodolite.k8s.TopicManager +import theodolite.util.KafkaConfig /** * Organizes the deployment of benchmarks in Kubernetes. @@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment( val namespace: String, val resources: List<KubernetesResource>, private val kafkaConfig: HashMap<String, Any>, - private val topics: Collection<NewTopic>, + private val topics: List<KafkaConfig.TopicWrapper>, private val client: NamespacedKubernetesClient ) : BenchmarkDeployment { private val kafkaController = TopicManager(this.kafkaConfig) @@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment( * - Deploy the needed resources. */ override fun setup() { - kafkaController.createTopics(this.topics) + val kafkaTopics = this.topics.filter { !it.removeOnly } + .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } + kafkaController.createTopics(kafkaTopics) resources.forEach { kubernetesManager.deploy(it) } @@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment( resources.forEach { kubernetesManager.remove(it) } - kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) + kafkaController.removeTopics(this.topics.map { topic -> topic.name }) KafkaLagExporterRemover(client).remove(LABEL) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 90c2cb8a9c810aa524b6608a558d31e15587719e..d58b06fa22b95c0e107e4d80828b23a53c2d927b 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { */ fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - delete(topics, kafkaAdmin) + val currentTopics = kafkaAdmin.listTopics().names().get() + delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin) kafkaAdmin.close() } + /** + * This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic` + * + * @param existingTopic string for which should be checked if it could be created + * @param topics list of string which are used as possible prefixes to create `existingTopic` + * @return true, `existingTopics` matches a created regex, else false + */ + private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { + for (t in topics) { + val regex = ("$t.*").toRegex() + if (regex.matches(existingTopic)) { + return true + } + } + return false + } + private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { var deleted = false diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt index 4ba096e37345a9488cb288b21a8aa57ff07ac1ff..f304d8cd06969d4650329b9b9f410a56985a2002 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import io.quarkus.runtime.annotations.RegisterForReflection import org.apache.kafka.clients.admin.NewTopic import kotlin.properties.Delegates +import kotlin.reflect.KProperty /** * Configuration of Kafka connection. @@ -23,15 +24,6 @@ class KafkaConfig { */ lateinit var topics: List<TopicWrapper> - /** - * Get all current Kafka topics. - * - * @return the list of topics. - */ - fun getKafkaTopics(): List<NewTopic> { - return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) } - } - /** * Wrapper for a topic definition. */ @@ -51,5 +43,25 @@ class KafkaConfig { * The replication factor of this topic */ var replicationFactor by Delegates.notNull<Short>() + + /** + * If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix. + */ + var removeOnly by DelegatesFalse() } } + +/** + * Delegates to initialize a lateinit boolean to false + */ +@RegisterForReflection +class DelegatesFalse { + private var state = false + operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean { + return state + } + operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) { + state = value + } + +} diff --git a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml index 9fc16f92e303f05a449f7e8b83600c3b299f215d..122daec9bbda57bfb601157521ff98e782d56190 100644 --- a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml +++ b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml @@ -26,4 +26,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite" + removeOnly: True \ No newline at end of file