Skip to content
Snippets Groups Projects
Commit dab43a3b authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb into 144-document-image-build
parents cf37f2bc a10e11de
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!116Add image build documentation,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 80 additions and 53 deletions
......@@ -22,7 +22,7 @@ loadTypes:
- type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml"
kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092"
bootstrapServer: "localhost:31290"
topics:
- name: "input"
numPartitions: 40
......
......@@ -17,6 +17,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
loadGenerationDelay: 30 # in seconds, optional field, default is 0 seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
delay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ interface Benchmark {
fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment
}
......@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String>
var loadGenerationDelay = 0L
}
/**
......
......@@ -22,7 +22,7 @@ private var DEFAULT_NAMESPACE = "default"
* - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager],
* - [kafkaConfig] for the [theodolite.k8s.TopicManager],
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
......@@ -63,38 +63,43 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
* First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment.
* @param res concrete resource that will be scaled for this experiment.
* @param configurationOverrides
* @return a [BenchmarkDeployment]
*/
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long
): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." }
val resources = loadKubernetesResources(this.appResource + this.loadGenResource)
val appResources = loadKubernetesResources(this.appResource)
val loadGenResources = loadKubernetesResources(this.loadGenResource)
val patcherFactory = PatcherFactory()
// patch the load dimension the resources
load.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(load.get().toString())
patcherFactory.createPatcher(patcherDefinition, loadGenResources).patch(load.get().toString())
}
res.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString())
patcherFactory.createPatcher(patcherDefinition, appResources).patch(res.get().toString())
}
// Patch the given overrides
configurationOverrides.forEach { override ->
override?.let {
patcherFactory.createPatcher(it.patcher, resources).patch(override.value)
patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
}
}
return KubernetesBenchmarkDeployment(
namespace = namespace,
resources = resources.map { r -> r.second },
appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
......
......@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
import java.time.Duration
private val logger = KotlinLogging.logger {}
......@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {}
@RegisterForReflection
class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long,
private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
......@@ -41,7 +44,10 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) }
appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis())
loadGenResources.forEach { kubernetesManager.deploy(it) }
}
/**
......@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s.
*/
override fun teardown() {
resources.forEach {
kubernetesManager.remove(it)
}
loadGenResources.forEach { kubernetesManager.remove(it) }
appResources.forEach { kubernetesManager.remove(it) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
......
......@@ -4,7 +4,7 @@ import theodolite.util.PrometheusResponse
import java.time.Instant
/**
* A SloChecker can be used to evaluate data from Promehteus.
* A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker
*/
interface SloChecker {
......
......@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark,
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val executionId: Int
val executionId: Int,
val loadGenerationDelay: Long
) {
var run: AtomicBoolean = AtomicBoolean(true)
......
......@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
executionId: Int
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) {
executionId: Int,
loadGenerationDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay)
try {
benchmarkDeployment.setup()
......
......@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
benchmark.buildDeployment(
load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides
configurationOverrides = benchmarkExecution.configOverrides,
loadGenerationDelay = 0L
)
deployment.teardown()
} catch (e: Exception) {
......
......@@ -71,7 +71,8 @@ class TheodoliteExecutor(
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
executionId = config.executionId
executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay
)
return Config(
......@@ -128,7 +129,7 @@ class TheodoliteExecutor(
fun run() {
val resultsFolder = getResultFolderString()
storeAsFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder/${this.config.executionId}-benchmark-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder${this.config.executionId}-benchmark-configuration")
val config = buildConfig()
// execute benchmarks for each load
......
......@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
......
......@@ -3,7 +3,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
......@@ -29,10 +28,12 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
/**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue.
* Updates an execution. If this execution is running at the time this function is called, it is stopped and
* added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue.
*
* @param execution the execution to update
* @param oldExecution the old execution
* @param newExecution the new execution
*/
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
......
......@@ -2,7 +2,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import khttp.patch
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
......
......@@ -32,7 +32,14 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
* @return CustomResource from fabric8
*/
private fun loadServiceMonitor(path: String): ServiceMonitorWrapper {
return loadGenericResource(path) { x: String -> ServiceMonitorWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) }
return loadGenericResource(path) { x: String ->
ServiceMonitorWrapper(
YamlParser().parse(
path,
HashMap<String, String>()::class.java
)!!
)
}
}
/**
......
......@@ -45,7 +45,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
"Topics creation finished with result: ${
result
.values()
.map { it -> it.key + ": " + it.value.isDone }
.map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
......@@ -64,11 +64,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
* This function checks whether one string in `topics` can be used as prefix of a regular expression
* to create the string `existingTopic`.
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
* @param existingTopic string for which should be checked if it could be created.
* @param topics list of string which are used as possible prefixes to create `existingTopic`.
* @return true, `existingTopics` matches a created regex, else false.
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
......@@ -89,7 +90,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
result.all().get() // wait for the future to be completed
logger.info {
"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
result.values().map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
}"
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment