Skip to content
Snippets Groups Projects
Commit 3224d8c9 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactoring

parent 10e07cfc
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!134Refactoring,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 54 additions and 44 deletions
......@@ -22,7 +22,7 @@ private var DEFAULT_NAMESPACE = "default"
* - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager],
* - [kafkaConfig] for the [theodolite.k8s.TopicManager],
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
......@@ -63,7 +63,7 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
* First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment.
* @param res concrete resource that will be scaled for this experiment.
* @param configurationOverrides
* @return a [BenchmarkDeployment]
*/
......
......@@ -35,7 +35,7 @@ class KubernetesBenchmarkDeployment(
*/
override fun setup() {
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach {
kubernetesManager.deploy(it)
......
......@@ -4,7 +4,7 @@ import theodolite.util.PrometheusResponse
import java.time.Instant
/**
* A SloChecker can be used to evaluate data from Promehteus.
* A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker
*/
interface SloChecker {
......
......@@ -41,7 +41,7 @@ abstract class BenchmarkExecutor(
* given load, false otherwise.
*/
abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean
/**
* Wait while the benchmark is running and log the number of minutes executed every 1 minute.
*
......
......@@ -16,14 +16,14 @@ object Main {
val mode = System.getenv("MODE") ?: "standalone"
logger.info { "Start Theodolite with mode $mode" }
when(mode) {
when (mode) {
"standalone" -> TheodoliteYamlExecutor().start()
"yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start()
else -> {
else -> {
logger.error { "MODE $mode not found" }
exitProcess(1)
}
}
}
}
\ No newline at end of file
}
......@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
......@@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
class BenchmarkEventHandler(private val controller: TheodoliteController) : ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
......@@ -39,7 +40,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
......@@ -57,7 +58,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
......
......@@ -3,7 +3,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
......@@ -15,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController
* @see BenchmarkExecution
*/
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
class ExecutionHandler(private val controller: TheodoliteController) : ResourceEventHandler<BenchmarkExecution> {
/**
* Add an execution to the end of the queue of the TheodoliteController.
......@@ -29,17 +28,19 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
/**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue.
* Updates an execution. If this execution is running at the time this function is called, it is stopped and
* added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue.
*
* @param execution the execution to update
* @param oldExecution the old execution
* @param newExecution the new execution
*/
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
try {
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
} catch(e: NullPointerException) {
} catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
this.controller.executionsQueue.addFirst(newExecution)
......@@ -58,7 +59,7 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
logger.info { "Delete execution ${execution.metadata.name} from queue." }
} catch(e: NullPointerException) {
} catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
......
......@@ -2,7 +2,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import khttp.patch
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
......
......@@ -21,12 +21,12 @@ class K8sContextFactory {
*
* @see CustomResourceDefinitionContext
*/
fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext {
return CustomResourceDefinitionContext.Builder()
fun create(api: String, scope: String, group: String, plural: String): CustomResourceDefinitionContext {
return CustomResourceDefinitionContext.Builder()
.withVersion(api)
.withScope(scope)
.withGroup(group)
.withPlural(plural)
.build()
}
}
\ No newline at end of file
}
......@@ -32,7 +32,14 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
* @return CustomResource from fabric8
*/
private fun loadServiceMonitor(path: String): ServiceMonitorWrapper {
return loadGenericResource(path) { x: String -> ServiceMonitorWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) }
return loadGenericResource(path) { x: String ->
ServiceMonitorWrapper(
YamlParser().parse(
path,
HashMap<String, String>()::class.java
)!!
)
}
}
/**
......
......@@ -43,7 +43,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
logger.info {
"Topics creation finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
result.values().map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
......@@ -57,20 +57,21 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{matchRegex(it, topics)}, kafkaAdmin)
delete(currentTopics.filter { matchRegex(it, topics) }, kafkaAdmin)
kafkaAdmin.close()
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
* This function checks whether one string in `topics` can be used as prefix of a regular expression
* to create the string `existingTopic`.
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
* @param existingTopic string for which should be checked if it could be created.
* @param topics list of string which are used as possible prefixes to create `existingTopic`.
* @return true, `existingTopics` matches a created regex, else false.
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
val regex = t.toRegex()
val regex = t.toRegex()
if (regex.matches(existingTopic)) {
return true
}
......@@ -87,7 +88,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
result.all().get() // wait for the future to be completed
logger.info {
"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
result.values().map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
}"
}
......
......@@ -12,7 +12,7 @@ import io.fabric8.kubernetes.api.model.apps.StatefulSet
*/
class ImagePatcher(private val k8sResource: KubernetesResource, private val container: String) :
AbstractPatcher(k8sResource, container) {
override fun <String> patch(imagePath: String) {
if (k8sResource is Deployment) {
k8sResource.spec.template.spec.containers.filter { it.name == container }.forEach {
......
......@@ -7,12 +7,13 @@ import kotlin.math.pow
private const val NUM_SENSORS = 4.0
private const val LOAD_GEN_MAX_RECORDS = 150000
class NumNestedGroupsLoadGeneratorReplicaPatcher(private val k8sResource: KubernetesResource) : AbstractPatcher(k8sResource) {
class NumNestedGroupsLoadGeneratorReplicaPatcher(private val k8sResource: KubernetesResource) :
AbstractPatcher(k8sResource) {
override fun <String> patch(value: String) {
if (k8sResource is Deployment) {
if (value is kotlin.String) {
val approxNumSensors = NUM_SENSORS.pow(Integer.parseInt(value).toDouble())
val loadGenInstances = (approxNumSensors + LOAD_GEN_MAX_RECORDS -1) / LOAD_GEN_MAX_RECORDS
val loadGenInstances = (approxNumSensors + LOAD_GEN_MAX_RECORDS - 1) / LOAD_GEN_MAX_RECORDS
this.k8sResource.spec.replicas = loadGenInstances.toInt()
}
}
......
......@@ -5,7 +5,8 @@ import io.fabric8.kubernetes.api.model.apps.Deployment
private const val LOAD_GEN_MAX_RECORDS = 150000
class NumSensorsLoadGeneratorReplicaPatcher(private val k8sResource: KubernetesResource) : AbstractPatcher(k8sResource) {
class NumSensorsLoadGeneratorReplicaPatcher(private val k8sResource: KubernetesResource) :
AbstractPatcher(k8sResource) {
override fun <String> patch(value: String) {
if (k8sResource is Deployment) {
if (value is kotlin.String) {
......
......@@ -6,7 +6,6 @@ import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.ResourceRequirements
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import java.lang.IllegalArgumentException
/**
* The Resource limit [Patcher] set resource limits for deployments and statefulSets.
......
......@@ -6,7 +6,6 @@ import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.kubernetes.api.model.ResourceRequirements
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import java.lang.IllegalArgumentException
/**
* The Resource request [Patcher] set resource limits for deployments and statefulSets.
......
......@@ -4,14 +4,14 @@ import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.apps.Deployment
/**
* The Scheduler name [Patcher] make it possible to set the scheduler which should be used to deploy the given deployment.
*
* The Scheduler name [Patcher] make it possible to set the scheduler which should
* be used to deploy the given deployment.
* @param k8sResource Kubernetes resource to be patched.
*/
class SchedulerNamePatcher(private val k8sResource: KubernetesResource): Patcher {
class SchedulerNamePatcher(private val k8sResource: KubernetesResource) : Patcher {
override fun <String> patch(value: String) {
if (k8sResource is Deployment) {
k8sResource.spec.template.spec.schedulerName = value as kotlin.String
}
}
}
\ No newline at end of file
}
......@@ -10,7 +10,7 @@ import theodolite.util.Resource
* Composite strategy that combines a SearchStrategy and a set of RestrictionStrategy.
*
* @param searchStrategy the [SearchStrategy] that is executed as part of this [CompositeStrategy].
* @param restrictionStrategies the set of [RestrictionStrategy] that are connected conjuntively to restrict the [Resource]
* @param restrictionStrategies the set of [RestrictionStrategy] that are connected conjunctive to restrict the [Resource]
* @param benchmarkExecutor Benchmark executor which runs the individual benchmarks.
*/
@RegisterForReflection
......
......@@ -18,7 +18,7 @@ private val logger = KotlinLogging.logger {}
class FullSearch(benchmarkExecutor: BenchmarkExecutor) : SearchStrategy(benchmarkExecutor) {
override fun findSuitableResource(load: LoadDimension, resources: List<Resource>): Resource? {
var minimalSuitableResources: Resource? = null;
var minimalSuitableResources: Resource? = null
for (res in resources) {
logger.info { "Running experiment with load '${load.get()}' and resources '${res.get()}'" }
val result = this.benchmarkExecutor.runExperiment(load, res)
......
......@@ -2,7 +2,7 @@ package theodolite.util
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.quarkus.runtime.annotations.RegisterForReflection
import org.apache.kafka.clients.admin.NewTopic
import theodolite.util.KafkaConfig.TopicWrapper
import kotlin.properties.Delegates
import kotlin.reflect.KProperty
......@@ -60,6 +60,7 @@ class DelegatesFalse {
operator fun getValue(thisRef: Any?, property: KProperty<*>): Boolean {
return state
}
operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Boolean) {
state = value
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment