diff --git a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml index d507934fb87127748a1f6e22177f9dd9430f5f0b..ebc2fe9e44fe303e342cabee301cb63664867cfb 100644 --- a/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml +++ b/theodolite-quarkus/config/example-benchmark-yaml-resource.yaml @@ -26,4 +26,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/config/example-operator-benchmark.yaml b/theodolite-quarkus/config/example-operator-benchmark.yaml index 93b42e8690c3b9f432aea1c6711ee0118d14adb3..3ed5218d8a8988b130e8d549c120cbca7329ffe3 100644 --- a/theodolite-quarkus/config/example-operator-benchmark.yaml +++ b/theodolite-quarkus/config/example-operator-benchmark.yaml @@ -30,4 +30,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 71b65a28fd074e4554c283ee94a8db028d652d46..bbcb8a957fb2f04ca678b231a878be0a23d46748 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { namespace = namespace, resources = resources.map { r -> r.second }, kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), - topics = kafkaConfig.getKafkaTopics(), + topics = kafkaConfig.topics, client = DefaultKubernetesClient().inNamespace(namespace) ) } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 0de00fb5007b646240661310d037e34ef1ea1ae2..08fdd840dc120f34ab6d303b2013928719c9483c 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection import org.apache.kafka.clients.admin.NewTopic import theodolite.k8s.K8sManager import theodolite.k8s.TopicManager +import theodolite.util.KafkaConfig /** * Organizes the deployment of benchmarks in Kubernetes. @@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment( val namespace: String, val resources: List<KubernetesResource>, private val kafkaConfig: HashMap<String, Any>, - private val topics: Collection<NewTopic>, + private val topics: List<KafkaConfig.TopicWrapper>, private val client: NamespacedKubernetesClient ) : BenchmarkDeployment { private val kafkaController = TopicManager(this.kafkaConfig) @@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment( * - Deploy the needed resources. */ override fun setup() { - kafkaController.createTopics(this.topics) + val kafkaTopics = this.topics.filter { !it.removeOnly } + .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } + kafkaController.createTopics(kafkaTopics) resources.forEach { kubernetesManager.deploy(it) } @@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment( resources.forEach { kubernetesManager.remove(it) } - kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) + kafkaController.removeTopics(this.topics.map { topic -> topic.name }) KafkaLagExporterRemover(client).remove(LABEL) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 90c2cb8a9c810aa524b6608a558d31e15587719e..1367f625808059f9e1a56f8be40c14e6f70e356d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { */ fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - delete(topics, kafkaAdmin) + val currentTopics = kafkaAdmin.listTopics().names().get() + delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin) kafkaAdmin.close() } + /** + * This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic` + * + * @param existingTopic string for which should be checked if it could be created + * @param topics list of string which are used as possible prefixes to create `existingTopic` + * @return true, `existingTopics` matches a created regex, else false + */ + private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { + for (t in topics) { + val regex = t.toRegex() + if (regex.matches(existingTopic)) { + return true + } + } + return false + } + private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { var deleted = false diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt index 7f3311182e324f1ebe10bb664ea7766aca1aa783..28e8194c699cd074026c8cb7e6f3ce4ec347023b 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt @@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm // special case: length == 1 or 2 if (lower == upper) { val res = resources[lower] - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower else { if (lower + 1 == resources.size) return -1 @@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm // length > 2 and adjust upper and lower depending on the result for `resources[mid]` val mid = (upper + lower) / 2 val res = resources[mid] - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, resources[mid])) { if (mid == lower) { return lower diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt index 9698dab18c5eb804fc9a60ef23fa734a9917b338..20290a9477f16c7d479d32ec4435da0c1bb26514 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt @@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { var minimalSuitableResources: Resource? = null; for (res in resources) { - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } val result = this.benchmarkExecutor.runExperiment(load, res) if (result && minimalSuitableResources != null) { minimalSuitableResources = res diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt index 3936d1982cc2e0d4701d3ff643199cfdc0d35ff4..85deaf6fa75437199bfc560404eb5b40bb4a986a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt @@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { for (res in resources) { - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, res)) return res } return null diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt index 4ba096e37345a9488cb288b21a8aa57ff07ac1ff..f304d8cd06969d4650329b9b9f410a56985a2002 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import io.quarkus.runtime.annotations.RegisterForReflection import org.apache.kafka.clients.admin.NewTopic import kotlin.properties.Delegates +import kotlin.reflect.KProperty /** * Configuration of Kafka connection. @@ -23,15 +24,6 @@ class KafkaConfig { */ lateinit var topics: List<TopicWrapper> - /** - * Get all current Kafka topics. - * - * @return the list of topics. - */ - fun getKafkaTopics(): List<NewTopic> { - return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) } - } - /** * Wrapper for a topic definition. */ @@ -51,5 +43,25 @@ class KafkaConfig { * The replication factor of this topic */ var replicationFactor by Delegates.notNull<Short>() + + /** + * If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix. + */ + var removeOnly by DelegatesFalse() } } + +/** + * Delegates to initialize a lateinit boolean to false + */ +@RegisterForReflection +class DelegatesFalse { + private var state = false + operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean { + return state + } + operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) { + state = value + } + +} diff --git a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml index 9fc16f92e303f05a449f7e8b83600c3b299f215d..19ec972be8236fbdcad123e9c9ef63945bb53d16 100644 --- a/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml +++ b/theodolite-quarkus/src/main/resources/operator/example-benchmark-k8s-resource.yaml @@ -26,4 +26,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file