Skip to content
Snippets Groups Projects
Commit ecc3aea7 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' of...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/theodolite into theodolite-kotlin
parents 9ec96311 10e07cfc
No related branches found
No related tags found
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2916 passed
Showing
with 95 additions and 32 deletions
...@@ -26,4 +26,6 @@ kafkaConfig: ...@@ -26,4 +26,6 @@ kafkaConfig:
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
replicationFactor: 1 replicationFactor: 1
\ No newline at end of file - name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
...@@ -30,4 +30,6 @@ kafkaConfig: ...@@ -30,4 +30,6 @@ kafkaConfig:
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
replicationFactor: 1 replicationFactor: 1
\ No newline at end of file - name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
...@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -96,7 +96,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
namespace = namespace, namespace = namespace,
resources = resources.map { r -> r.second }, resources = resources.map { r -> r.second },
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.getKafkaTopics(), topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace) client = DefaultKubernetesClient().inNamespace(namespace)
) )
} }
......
...@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection ...@@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
/** /**
* Organizes the deployment of benchmarks in Kubernetes. * Organizes the deployment of benchmarks in Kubernetes.
...@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment( ...@@ -20,7 +21,7 @@ class KubernetesBenchmarkDeployment(
val namespace: String, val namespace: String,
val resources: List<KubernetesResource>, val resources: List<KubernetesResource>,
private val kafkaConfig: HashMap<String, Any>, private val kafkaConfig: HashMap<String, Any>,
private val topics: Collection<NewTopic>, private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient private val client: NamespacedKubernetesClient
) : BenchmarkDeployment { ) : BenchmarkDeployment {
private val kafkaController = TopicManager(this.kafkaConfig) private val kafkaController = TopicManager(this.kafkaConfig)
...@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment( ...@@ -33,7 +34,9 @@ class KubernetesBenchmarkDeployment(
* - Deploy the needed resources. * - Deploy the needed resources.
*/ */
override fun setup() { override fun setup() {
kafkaController.createTopics(this.topics) val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { resources.forEach {
kubernetesManager.deploy(it) kubernetesManager.deploy(it)
} }
...@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment( ...@@ -49,7 +52,7 @@ class KubernetesBenchmarkDeployment(
resources.forEach { resources.forEach {
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
kafkaController.removeTopics(this.topics.map { topic -> topic.name() }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LABEL)
} }
} }
...@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -56,10 +56,28 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
delete(topics, kafkaAdmin) val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
kafkaAdmin.close() kafkaAdmin.close()
} }
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
}
return false
}
private fun delete(topics: List<String>, kafkaAdmin: AdminClient) { private fun delete(topics: List<String>, kafkaAdmin: AdminClient) {
var deleted = false var deleted = false
......
...@@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm ...@@ -36,7 +36,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// special case: length == 1 or 2 // special case: length == 1 or 2
if (lower == upper) { if (lower == upper) {
val res = resources[lower] val res = resources[lower]
logger.info { "Running experiment with load '$load' and resources '$res'" } logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower if (this.benchmarkExecutor.runExperiment(load, resources[lower])) return lower
else { else {
if (lower + 1 == resources.size) return -1 if (lower + 1 == resources.size) return -1
...@@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm ...@@ -47,7 +47,7 @@ class BinarySearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
// length > 2 and adjust upper and lower depending on the result for `resources[mid]` // length > 2 and adjust upper and lower depending on the result for `resources[mid]`
val mid = (upper + lower) / 2 val mid = (upper + lower) / 2
val res = resources[mid] val res = resources[mid]
logger.info { "Running experiment with load '$load' and resources '$res'" } logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, resources[mid])) { if (this.benchmarkExecutor.runExperiment(load, resources[mid])) {
if (mid == lower) { if (mid == lower) {
return lower return lower
......
...@@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar ...@@ -20,7 +20,7 @@ class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmar
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
var minimalSuitableResources: Resource? = null; var minimalSuitableResources: Resource? = null;
for (res in resources) { for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" } logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
val result = this.benchmarkExecutor.runExperiment(load, res) val result = this.benchmarkExecutor.runExperiment(load, res)
if (result && minimalSuitableResources != null) { if (result && minimalSuitableResources != null) {
minimalSuitableResources = res minimalSuitableResources = res
......
...@@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm ...@@ -17,7 +17,7 @@ class LinearSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchm
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? { override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
for (res in resources) { for (res in resources) {
logger.info { "Running experiment with load '$load' and resources '$res'" } logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
if (this.benchmarkExecutor.runExperiment(load, res)) return res if (this.benchmarkExecutor.runExperiment(load, res)) return res
} }
return null return null
......
...@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize ...@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import kotlin.properties.Delegates import kotlin.properties.Delegates
import kotlin.reflect.KProperty
/** /**
* Configuration of Kafka connection. * Configuration of Kafka connection.
...@@ -23,15 +24,6 @@ class KafkaConfig { ...@@ -23,15 +24,6 @@ class KafkaConfig {
*/ */
lateinit var topics: List<TopicWrapper> lateinit var topics: List<TopicWrapper>
/**
* Get all current Kafka topics.
*
* @return the list of topics.
*/
fun getKafkaTopics(): List<NewTopic> {
return topics.map { topic -> NewTopic(topic.name, topic.numPartitions, topic.replicationFactor) }
}
/** /**
* Wrapper for a topic definition. * Wrapper for a topic definition.
*/ */
...@@ -51,5 +43,25 @@ class KafkaConfig { ...@@ -51,5 +43,25 @@ class KafkaConfig {
* The replication factor of this topic * The replication factor of this topic
*/ */
var replicationFactor by Delegates.notNull<Short>() var replicationFactor by Delegates.notNull<Short>()
/**
* If remove only, this topic would only used to delete all topics, which has the name of the topic as a prefix.
*/
var removeOnly by DelegatesFalse()
} }
} }
/**
* Delegates to initialize a lateinit boolean to false
*/
@RegisterForReflection
class DelegatesFalse {
private var state = false
operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
return state
}
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
state = value
}
}
...@@ -40,7 +40,8 @@ class Results { ...@@ -40,7 +40,8 @@ class Results {
* @param load the [LoadDimension] * @param load the [LoadDimension]
* *
* @return the smallest suitable number of resources. If the experiment was not executed yet, * @return the smallest suitable number of resources. If the experiment was not executed yet,
* a @see Resource with the constant Int.MAX_VALUE as value is returned. If no experiments have been marked as either successful or unsuccessful * a @see Resource with the constant Int.MAX_VALUE as value is returned.
* If no experiments have been marked as either successful or unsuccessful
* yet, a Resource with the constant value Int.MIN_VALUE is returned. * yet, a Resource with the constant value Int.MIN_VALUE is returned.
*/ */
fun getMinRequiredInstances(load: LoadDimension?): Resource? { fun getMinRequiredInstances(load: LoadDimension?): Resource? {
...@@ -72,13 +73,11 @@ class Results { ...@@ -72,13 +73,11 @@ class Results {
fun getMaxBenchmarkedLoad(load: LoadDimension): LoadDimension? { fun getMaxBenchmarkedLoad(load: LoadDimension): LoadDimension? {
var maxBenchmarkedLoad: LoadDimension? = null var maxBenchmarkedLoad: LoadDimension? = null
for (experiment in results) { for (experiment in results) {
if (experiment.value) { if (experiment.key.first.get() <= load.get()) {
if (experiment.key.first.get() <= load.get()) { if (maxBenchmarkedLoad == null) {
if (maxBenchmarkedLoad == null) { maxBenchmarkedLoad = experiment.key.first
maxBenchmarkedLoad = experiment.key.first } else if (maxBenchmarkedLoad.get() < experiment.key.first.get()) {
} else if (maxBenchmarkedLoad.get() < experiment.key.first.get()) { maxBenchmarkedLoad = experiment.key.first
maxBenchmarkedLoad = experiment.key.first
}
} }
} }
} }
......
...@@ -26,4 +26,6 @@ kafkaConfig: ...@@ -26,4 +26,6 @@ kafkaConfig:
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
replicationFactor: 1 replicationFactor: 1
\ No newline at end of file - name: "theodolite-.*"
removeOnly: True
\ No newline at end of file
package theodolite.util package theodolite.util
import io.quarkus.test.junit.QuarkusTest import io.quarkus.test.junit.QuarkusTest
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
...@@ -44,7 +45,31 @@ internal class ResultsTest { ...@@ -44,7 +45,31 @@ internal class ResultsTest {
LoadDimension(load, emptyList()), LoadDimension(load, emptyList()),
Resource(resources, emptyList()) Resource(resources, emptyList())
), ),
successful) successful
)
} }
}
\ No newline at end of file @Test
fun testGetMaxBenchmarkedLoadWhenAllSuccessful() {
val results = Results()
results.setResult(10000, 1, true)
results.setResult(10000, 2, true)
val test1 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get()
assertEquals(10000, test1)
}
@Test
fun testGetMaxBenchmarkedLoadWhenLargestNotSuccessful() {
val results = Results()
results.setResult(10000, 1, true)
results.setResult(10000, 2, true)
results.setResult(20000, 1, false)
val test2 = results.getMaxBenchmarkedLoad(LoadDimension(100000, emptyList()))!!.get()
assertEquals(20000, test2)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment