Skip to content
Snippets Groups Projects
Commit 10e07cfc authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '219-delete-internal-kstreams-topic' into 'theodolite-kotlin'

Allow to delete all topics that have a specific prefix

See merge request !133
parents 66041d3d 41c94097
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!133Allow to delete all topics that have a specific prefix,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2867 passed
Showing
with 60 additions and 21 deletions
......@@ -26,4 +26,6 @@ kafkaConfig:
topics:
- name: "input"
numPartitions: 40
replicationFactor: 1
\ No newline at end of file
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -30,4 +30,6 @@ kafkaConfig:
topics:
- name: "input"
numPartitions: 40
replicationFactor: 1
\ No newline at end of file
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
namespace = namespace,
resources = resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.getKafkaTopics(),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
)
}
......
......@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
/**
* Organizes the deployment of benchmarks in Kubernetes.
......@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
private val kafkaConfig: HashMap<String, Any>,
private val topics: Collection<NewTopic>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig)
......@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources.
*/
override fun setup() {
kafkaController.createTopics(this.topics)
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach {
kubernetesManager.deploy(it)
}
......@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment(
resources.forEach {
kubernetesManager.remove(it)
}
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LABEL)
}
}
......@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
*/
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
kafkaAdmin.close()
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
}
return false
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false
......
......@@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// special case: length == 1 or 2
if (lower == upper) {
val res = resources[lower]
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower
else {
if (lower + 1 == resources.size) return -1
......@@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// length > 2 and adjust upper and lower depending on the result for `resources[mid]`
val mid = (upper + lower) / 2
val res = resources[mid]
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[mid])) {
if (mid == lower) {
return lower
......
......@@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
var minimalSuitableResources: Resource? = null;
for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
val result = this.benchmarkExecutor.runExperiment(load, res)
if (result && minimalSuitableResources != null) {
minimalSuitableResources = res
......
......@@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, res)) return res
}
return null
......
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic
import kotlin.properties.Delegates
import kotlin.reflect.KProperty
/**
* Configuration of Kafka connection.
......@@ -23,15 +24,6 @@ class KafkaConfig {
*/
lateinit var topics: List<TopicWrapper>
/**
* Get all current Kafka topics.
*
* @return the list of topics.
*/
fun getKafkaTopics(): List<NewTopic> {
return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) }
}
/**
* Wrapper for a topic definition.
*/
......@@ -51,5 +43,25 @@ class KafkaConfig {
* The replication factor of this topic
*/
var replicationFactor by Delegates.notNull<Short>()
/**
* If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix.
*/
var removeOnly by DelegatesFalse()
}
}
/**
* Delegates to initialize a lateinit boolean to false
*/
@RegisterForReflection
class DelegatesFalse {
private var state = false
operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
return state
}
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
state = value
}
}
......@@ -26,4 +26,6 @@ kafkaConfig:
topics:
- name: "input"
numPartitions: 40
replicationFactor: 1
\ No newline at end of file
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment