Skip to content
Snippets Groups Projects
Commit e566e97c authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' into benchmark-definitions

parents 341ea9c0 58f93b14
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!131Add definitions for UC1, UC2, UC3, UC4,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2926 passed
Showing
with 103 additions and 34 deletions
......@@ -30,3 +30,5 @@ kafkaConfig:
- name: "input"
numPartitions: 40
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -29,3 +29,5 @@ kafkaConfig:
- name: "input"
numPartitions: 40
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -27,3 +27,5 @@ kafkaConfig:
- name: "input"
numPartitions: 40
replicationFactor: 1
- name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
......@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
namespace = namespace,
resources = resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.getKafkaTopics(),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
)
}
......
......@@ -3,9 +3,13 @@ package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
private val logger = KotlinLogging.logger {}
/**
* Organizes the deployment of benchmarks in Kubernetes.
......@@ -20,12 +24,13 @@ class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
private val kafkaConfig: HashMap<String, Any>,
private val topics: Collection<NewTopic>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client)
private val LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val SLEEP_AFTER_K8S_DELETION_MS = 2000L
/**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
......@@ -33,7 +38,9 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources.
*/
override fun setup() {
kafkaController.createTopics(this.topics)
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach {
kubernetesManager.deploy(it)
}
......@@ -49,7 +56,9 @@ class KubernetesBenchmarkDeployment(
resources.forEach {
kubernetesManager.remove(it)
}
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
KafkaLagExporterRemover(client).remove(LABEL)
logger.info { "Kubernetes resources deleted. Allow for short pause before continuing." }
Thread.sleep(SLEEP_AFTER_K8S_DELETION_MS)
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
}
}
......@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
*/
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
kafkaAdmin.close()
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
}
return false
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false
......
......@@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// special case: length == 1 or 2
if (lower == upper) {
val res = resources[lower]
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower
else {
if (lower + 1 == resources.size) return -1
......@@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// length > 2 and adjust upper and lower depending on the result for `resources[mid]`
val mid = (upper + lower) / 2
val res = resources[mid]
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[mid])) {
if (mid == lower) {
return lower
......
......@@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
var minimalSuitableResources: Resource? = null;
for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
val result = this.benchmarkExecutor.runExperiment(load, res)
if (result && minimalSuitableResources != null) {
minimalSuitableResources = res
......
......@@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" }
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, res)) return res
}
return null
......
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic
import kotlin.properties.Delegates
import kotlin.reflect.KProperty
/**
* Configuration of Kafka connection.
......@@ -23,15 +24,6 @@ class KafkaConfig {
*/
lateinit var topics: List<TopicWrapper>
/**
* Get all current Kafka topics.
*
* @return the list of topics.
*/
fun getKafkaTopics(): List<NewTopic> {
return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) }
}
/**
* Wrapper for a topic definition.
*/
......@@ -51,5 +43,25 @@ class KafkaConfig {
* The replication factor of this topic
*/
var replicationFactor by Delegates.notNull<Short>()
/**
* If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix.
*/
var removeOnly by DelegatesFalse()
}
}
/**
* Delegates to initialize a lateinit boolean to false
*/
@RegisterForReflection
class DelegatesFalse {
private var state = false
operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
return state
}
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
state = value
}
}
......@@ -40,7 +40,8 @@ class Results {
* @param load the [LoadDimension]
*
* @return the smallest suitable number of resources. If the experiment was not executed yet,
* a @see Resource with the constant Int.MAX_VALUE as value is returned. If no experiments have been marked as either successful or unsuccessful
* a @see Resource with the constant Int.MAX_VALUE as value is returned.
* If no experiments have been marked as either successful or unsuccessful
* yet, a Resource with the constant value Int.MIN_VALUE is returned.
*/
fun getMinRequiredInstances(load: LoadDimension?): Resource? {
......@@ -72,7 +73,6 @@ class Results {
fun getMaxBenchmarkedLoad(load: LoadDimension): LoadDimension? {
var maxBenchmarkedLoad: LoadDimension? = null
for (experiment in results) {
if (experiment.value) {
if (experiment.key.first.get() <= load.get()) {
if (maxBenchmarkedLoad == null) {
maxBenchmarkedLoad = experiment.key.first
......@@ -81,7 +81,6 @@ class Results {
}
}
}
}
return maxBenchmarkedLoad
}
}
package theodolite.util
import io.quarkus.test.junit.QuarkusTest
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
......@@ -44,7 +45,31 @@ internal class ResultsTest {
LoadDimension(load, emptyList()),
Resource(resources, emptyList())
),
successful)
successful
)
}
@Test
fun testGetMaxBenchmarkedLoadWhenAllSuccessful() {
val results = Results()
results.setResult(10000, 1, true)
results.setResult(10000, 2, true)
val test1 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get()
assertEquals(10000, test1)
}
@Test
fun testGetMaxBenchmarkedLoadWhenLargestNotSuccessful() {
val results = Results()
results.setResult(10000, 1, true)
results.setResult(10000, 2, true)
results.setResult(20000, 1, false)
val test2 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get()
assertEquals(20000, test2)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment