diff --git a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-benchmark-operator.yaml index a7695eeaf776f8c1e6edcd354bb16df2c30df065..928c083c4ae47220872e7ed83ed88bec530feba7 100644 --- a/theodolite-benchmarks/definitions/uc1-kstreams/uc1-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc1-kstreams/uc1-benchmark-operator.yaml @@ -29,4 +29,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/examples/operator/example-benchmark.yaml b/theodolite-quarkus/examples/operator/example-benchmark.yaml index 8a62a68c10c9c74ba62eca1960bac3cd4fbf633c..a7c0761b2dc915eea392e1160b6f37b611886c36 100644 --- a/theodolite-quarkus/examples/operator/example-benchmark.yaml +++ b/theodolite-quarkus/examples/operator/example-benchmark.yaml @@ -28,4 +28,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/examples/standalone/example-benchmark.yaml b/theodolite-quarkus/examples/standalone/example-benchmark.yaml index d507934fb87127748a1f6e22177f9dd9430f5f0b..ebc2fe9e44fe303e342cabee301cb63664867cfb 100644 --- a/theodolite-quarkus/examples/standalone/example-benchmark.yaml +++ b/theodolite-quarkus/examples/standalone/example-benchmark.yaml @@ -26,4 +26,6 @@ kafkaConfig: topics: - name: "input" numPartitions: 40 - replicationFactor: 1 \ No newline at end of file + replicationFactor: 1 + - name: "theodolite-.*" + removeOnly: True \ No newline at end of file diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt index 71b65a28fd074e4554c283ee94a8db028d652d46..bbcb8a957fb2f04ca678b231a878be0a23d46748 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmark.kt @@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { namespace = namespace, resources = resources.map { r -> r.second }, kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), - topics = kafkaConfig.getKafkaTopics(), + topics = kafkaConfig.topics, client = DefaultKubernetesClient().inNamespace(namespace) ) } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt index 0de00fb5007b646240661310d037e34ef1ea1ae2..28a7b12e8b2203496e870fbd67641cd854bbbe0d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/benchmark/KubernetesBenchmarkDeployment.kt @@ -3,9 +3,13 @@ package theodolite.benchmark import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.quarkus.runtime.annotations.RegisterForReflection +import mu.KotlinLogging import org.apache.kafka.clients.admin.NewTopic import theodolite.k8s.K8sManager import theodolite.k8s.TopicManager +import theodolite.util.KafkaConfig + +private val logger = KotlinLogging.logger {} /** * Organizes the deployment of benchmarks in Kubernetes. @@ -20,12 +24,13 @@ class KubernetesBenchmarkDeployment( val namespace: String, val resources: List<KubernetesResource>, private val kafkaConfig: HashMap<String, Any>, - private val topics: Collection<NewTopic>, + private val topics: List<KafkaConfig.TopicWrapper>, private val client: NamespacedKubernetesClient ) : BenchmarkDeployment { private val kafkaController = TopicManager(this.kafkaConfig) private val kubernetesManager = K8sManager(client) - private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter" + private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter" + private val SLEEP_AFTER_K8S_DELETION_MS = 2000L /** * Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]: @@ -33,7 +38,9 @@ class KubernetesBenchmarkDeployment( * - Deploy the needed resources. */ override fun setup() { - kafkaController.createTopics(this.topics) + val kafkaTopics = this.topics.filter { !it.removeOnly } + .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } + kafkaController.createTopics(kafkaTopics) resources.forEach { kubernetesManager.deploy(it) } @@ -49,7 +56,9 @@ class KubernetesBenchmarkDeployment( resources.forEach { kubernetesManager.remove(it) } - kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) - KafkaLagExporterRemover(client).remove(LABEL) + logger.info { "Kubernetes resources deleted. Allow for short pause before continuing." } + Thread.sleep(SLEEP_AFTER_K8S_DELETION_MS) + kafkaController.removeTopics(this.topics.map { topic -> topic.name }) + KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) } } diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt index 90c2cb8a9c810aa524b6608a558d31e15587719e..1367f625808059f9e1a56f8be40c14e6f70e356d 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/k8s/TopicManager.kt @@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { */ fun removeTopics(topics: List<String>) { val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) - delete(topics, kafkaAdmin) + val currentTopics = kafkaAdmin.listTopics().names().get() + delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin) kafkaAdmin.close() } + /** + * This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic` + * + * @param existingTopic string for which should be checked if it could be created + * @param topics list of string which are used as possible prefixes to create `existingTopic` + * @return true, `existingTopics` matches a created regex, else false + */ + private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { + for (t in topics) { + val regex = t.toRegex() + if (regex.matches(existingTopic)) { + return true + } + } + return false + } + private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { var deleted = false diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt index 7f3311182e324f1ebe10bb664ea7766aca1aa783..28e8194c699cd074026c8cb7e6f3ce4ec347023b 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/BinarySearch.kt @@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm // special case: length == 1 or 2 if (lower == upper) { val res = resources[lower] - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower else { if (lower + 1 == resources.size) return -1 @@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm // length > 2 and adjust upper and lower depending on the result for `resources[mid]` val mid = (upper + lower) / 2 val res = resources[mid] - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, resources[mid])) { if (mid == lower) { return lower diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt index 9698dab18c5eb804fc9a60ef23fa734a9917b338..20290a9477f16c7d479d32ec4435da0c1bb26514 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/FullSearch.kt @@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { var minimalSuitableResources: Resource? = null; for (res in resources) { - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } val result = this.benchmarkExecutor.runExperiment(load, res) if (result && minimalSuitableResources != null) { minimalSuitableResources = res diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt index 3936d1982cc2e0d4701d3ff643199cfdc0d35ff4..85deaf6fa75437199bfc560404eb5b40bb4a986a 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/strategies/searchstrategy/LinearSearch.kt @@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { for (res in resources) { - logger.info { "Running experiment with load '$load' and resources '$res'" } + logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" } if (this.benchmarkExecutor.runExperiment(load, res)) return res } return null diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt index 4ba096e37345a9488cb288b21a8aa57ff07ac1ff..f304d8cd06969d4650329b9b9f410a56985a2002 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/KafkaConfig.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import io.quarkus.runtime.annotations.RegisterForReflection import org.apache.kafka.clients.admin.NewTopic import kotlin.properties.Delegates +import kotlin.reflect.KProperty /** * Configuration of Kafka connection. @@ -23,15 +24,6 @@ class KafkaConfig { */ lateinit var topics: List<TopicWrapper> - /** - * Get all current Kafka topics. - * - * @return the list of topics. - */ - fun getKafkaTopics(): List<NewTopic> { - return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) } - } - /** * Wrapper for a topic definition. */ @@ -51,5 +43,25 @@ class KafkaConfig { * The replication factor of this topic */ var replicationFactor by Delegates.notNull<Short>() + + /** + * If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix. + */ + var removeOnly by DelegatesFalse() } } + +/** + * Delegates to initialize a lateinit boolean to false + */ +@RegisterForReflection +class DelegatesFalse { + private var state = false + operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean { + return state + } + operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) { + state = value + } + +} diff --git a/theodolite-quarkus/src/main/kotlin/theodolite/util/Results.kt b/theodolite-quarkus/src/main/kotlin/theodolite/util/Results.kt index ab40f3d1f722bab39c29e81621da86e8920bbf72..60641ea0248435de53aaaaf362da7be995b391c5 100644 --- a/theodolite-quarkus/src/main/kotlin/theodolite/util/Results.kt +++ b/theodolite-quarkus/src/main/kotlin/theodolite/util/Results.kt @@ -40,7 +40,8 @@ class Results { * @param load the [LoadDimension] * * @return the smallest suitable number of resources. If the experiment was not executed yet, - * a @see Resource with the constant Int.MAX_VALUE as value is returned. If no experiments have been marked as either successful or unsuccessful + * a @see Resource with the constant Int.MAX_VALUE as value is returned. + * If no experiments have been marked as either successful or unsuccessful * yet, a Resource with the constant value Int.MIN_VALUE is returned. */ fun getMinRequiredInstances(load: LoadDimension?): Resource? { @@ -72,13 +73,11 @@ class Results { fun getMaxBenchmarkedLoad(load: LoadDimension): LoadDimension? { var maxBenchmarkedLoad: LoadDimension? = null for (experiment in results) { - if (experiment.value) { - if (experiment.key.first.get() <= load.get()) { - if (maxBenchmarkedLoad == null) { - maxBenchmarkedLoad = experiment.key.first - } else if (maxBenchmarkedLoad.get() < experiment.key.first.get()) { - maxBenchmarkedLoad = experiment.key.first - } + if (experiment.key.first.get() <= load.get()) { + if (maxBenchmarkedLoad == null) { + maxBenchmarkedLoad = experiment.key.first + } else if (maxBenchmarkedLoad.get() < experiment.key.first.get()) { + maxBenchmarkedLoad = experiment.key.first } } } diff --git a/theodolite-quarkus/src/test/kotlin/theodolite/util/ResultsTest.kt b/theodolite-quarkus/src/test/kotlin/theodolite/util/ResultsTest.kt index 6a69040a47abaf2e39225c563dfbad1594b1f261..9cfc2ae78e7a8846e3f0fa136699509145e5de22 100644 --- a/theodolite-quarkus/src/test/kotlin/theodolite/util/ResultsTest.kt +++ b/theodolite-quarkus/src/test/kotlin/theodolite/util/ResultsTest.kt @@ -1,7 +1,8 @@ package theodolite.util import io.quarkus.test.junit.QuarkusTest -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test @@ -44,7 +45,31 @@ internal class ResultsTest { LoadDimension(load, emptyList()), Resource(resources, emptyList()) ), - successful) + successful + ) } -} \ No newline at end of file + + @Test + fun testGetMaxBenchmarkedLoadWhenAllSuccessful() { + val results = Results() + results.setResult(10000, 1, true) + results.setResult(10000, 2, true) + + val test1 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get() + + assertEquals(10000, test1) + } + + @Test + fun testGetMaxBenchmarkedLoadWhenLargestNotSuccessful() { + val results = Results() + results.setResult(10000, 1, true) + results.setResult(10000, 2, true) + results.setResult(20000, 1, false) + + val test2 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get() + + assertEquals(20000, test2) + } +}