Skip to content
Snippets Groups Projects
Commit 70bdf48b authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Allow to delete all topics that have a specific prefix.

parent 66041d3d
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!133Allow to delete all topics that have a specific prefix,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -26,4 +26,6 @@ kafkaConfig: ...@@ -26,4 +26,6 @@ kafkaConfig:
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
replicationFactor: 1 replicationFactor: 1
\ No newline at end of file - name: "theodolite"
removeOnly: True
\ No newline at end of file
...@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
namespace = namespace, namespace = namespace,
resources = resources.map { r -> r.second }, resources = resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.getKafkaTopics(), topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace) client = DefaultKubernetesClient().inNamespace(namespace)
) )
} }
......
...@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection ...@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
/** /**
* Organizes the deployment of benchmarks in Kubernetes. * Organizes the deployment of benchmarks in Kubernetes.
...@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment( ...@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment(
val namespace: String, val namespace: String,
val resources: List<KubernetesResource>, val resources: List<KubernetesResource>,
private val kafkaConfig: HashMap<String, Any>, private val kafkaConfig: HashMap<String, Any>,
private val topics: Collection<NewTopic>, private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient private val client: NamespacedKubernetesClient
) : BenchmarkDeployment { ) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig) private val kafkaController = TopicManager(this.kafkaConfig)
...@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment( ...@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources. * - Deploy the needed resources.
*/ */
override fun setup() { override fun setup() {
kafkaController.createTopics(this.topics) val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { resources.forEach {
kubernetesManager.deploy(it) kubernetesManager.deploy(it)
} }
...@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment( ...@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment(
resources.forEach { resources.forEach {
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LABEL)
} }
} }
...@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin) val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
kafkaAdmin.close() kafkaAdmin.close()
} }
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = ("$t.*").toRegex()
if (regex.matches(existingTopic)) {
return true
}
}
return false
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false var deleted = false
......
...@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize ...@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import kotlin.properties.Delegates import kotlin.properties.Delegates
import kotlin.reflect.KProperty
/** /**
* Configuration of Kafka connection. * Configuration of Kafka connection.
...@@ -23,15 +24,6 @@ class KafkaConfig { ...@@ -23,15 +24,6 @@ class KafkaConfig {
*/ */
lateinit var topics: List<TopicWrapper> lateinit var topics: List<TopicWrapper>
/**
* Get all current Kafka topics.
*
* @return the list of topics.
*/
fun getKafkaTopics(): List<NewTopic> {
return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) }
}
/** /**
* Wrapper for a topic definition. * Wrapper for a topic definition.
*/ */
...@@ -51,5 +43,25 @@ class KafkaConfig { ...@@ -51,5 +43,25 @@ class KafkaConfig {
* The replication factor of this topic * The replication factor of this topic
*/ */
var replicationFactor by Delegates.notNull<Short>() var replicationFactor by Delegates.notNull<Short>()
/**
* If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix.
*/
var removeOnly by DelegatesFalse()
} }
} }
/**
* Delegates to initialize a lateinit boolean to false
*/
@RegisterForReflection
class DelegatesFalse {
private var state = false
operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
return state
}
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
state = value
}
}
...@@ -26,4 +26,6 @@ kafkaConfig: ...@@ -26,4 +26,6 @@ kafkaConfig:
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
replicationFactor: 1 replicationFactor: 1
\ No newline at end of file - name: "theodolite"
removeOnly: True
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment