Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (13)
Showing
with 80 additions and 53 deletions
......@@ -22,7 +22,7 @@ loadTypes:
- type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml"
kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092"
bootstrapServer: "localhost:31290"
topics:
- name: "input"
numPartitions: 40
......
......@@ -17,6 +17,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
loadGenerationDelay: 30 # in seconds, optional field, default is 0 seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
delay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides: []
......
......@@ -21,6 +21,7 @@ interface Benchmark {
fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment
}
......@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String>
var loadGenerationDelay = 0L
}
/**
......
......@@ -22,7 +22,7 @@ private var DEFAULT_NAMESPACE = "default"
* - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager],
* - [kafkaConfig] for the [theodolite.k8s.TopicManager],
* - [namespace] for the client,
* - [path] under which the resource yamls can be found.
*
......@@ -63,38 +63,43 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
* First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment.
* @param res concrete resource that will be scaled for this experiment.
* @param configurationOverrides
* @return a [BenchmarkDeployment]
*/
override fun buildDeployment(
load: LoadDimension,
res: Resource,
configurationOverrides: List<ConfigurationOverride?>
configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long
): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." }
val resources = loadKubernetesResources(this.appResource + this.loadGenResource)
val appResources = loadKubernetesResources(this.appResource)
val loadGenResources = loadKubernetesResources(this.loadGenResource)
val patcherFactory = PatcherFactory()
// patch the load dimension the resources
load.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(load.get().toString())
patcherFactory.createPatcher(patcherDefinition, loadGenResources).patch(load.get().toString())
}
res.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString())
patcherFactory.createPatcher(patcherDefinition, appResources).patch(res.get().toString())
}
// Patch the given overrides
configurationOverrides.forEach { override ->
override?.let {
patcherFactory.createPatcher(it.patcher, resources).patch(override.value)
patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
}
}
return KubernetesBenchmarkDeployment(
namespace = namespace,
resources = resources.map { r -> r.second },
appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace)
......
......@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig
import java.time.Duration
private val logger = KotlinLogging.logger {}
......@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {}
@RegisterForReflection
class KubernetesBenchmarkDeployment(
val namespace: String,
val resources: List<KubernetesResource>,
val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long,
private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient
......@@ -39,9 +42,12 @@ class KubernetesBenchmarkDeployment(
*/
override fun setup() {
val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
.map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) }
appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis())
loadGenResources.forEach { kubernetesManager.deploy(it) }
}
/**
......@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s.
*/
override fun teardown() {
resources.forEach {
kubernetesManager.remove(it)
}
loadGenResources.forEach { kubernetesManager.remove(it) }
appResources.forEach { kubernetesManager.remove(it) }
kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
......
......@@ -4,7 +4,7 @@ import theodolite.util.PrometheusResponse
import java.time.Instant
/**
* A SloChecker can be used to evaluate data from Promehteus.
* A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker
*/
interface SloChecker {
......
......@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark,
val results: Results,
val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>,
val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo,
val executionId: Int
val executionId: Int,
val loadGenerationDelay: Long
) {
var run: AtomicBoolean = AtomicBoolean(true)
......@@ -41,7 +42,7 @@ abstract class BenchmarkExecutor(
* given load, false otherwise.
*/
abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean
/**
* Wait while the benchmark is running and log the number of minutes executed every 1 minute.
*
......
......@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark,
results: Results,
executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>,
configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo,
executionId: Int
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) {
executionId: Int,
loadGenerationDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay)
try {
benchmarkDeployment.setup()
......
......@@ -16,14 +16,14 @@ object Main {
val mode = System.getenv("MODE") ?: "standalone"
logger.info { "Start Theodolite with mode $mode" }
when(mode) {
when (mode) {
"standalone" -> TheodoliteYamlExecutor().start()
"yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start()
else -> {
else -> {
logger.error { "MODE $mode not found" }
exitProcess(1)
}
}
}
}
\ No newline at end of file
}
......@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
benchmark.buildDeployment(
load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides
configurationOverrides = benchmarkExecution.configOverrides,
loadGenerationDelay = 0L
)
deployment.teardown()
} catch (e: Exception) {
......
......@@ -71,7 +71,8 @@ class TheodoliteExecutor(
executionDuration = executionDuration,
configurationOverrides = config.configOverrides,
slo = config.slos[0],
executionId = config.executionId
executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay
)
return Config(
......@@ -128,7 +129,7 @@ class TheodoliteExecutor(
fun run() {
val resultsFolder = getResultFolderString()
storeAsFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder/${this.config.executionId}-benchmark-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder${this.config.executionId}-benchmark-configuration")
val config = buildConfig()
// execute benchmarks for each load
......
......@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
/**
......@@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController
* @see KubernetesBenchmark
*/
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
class BenchmarkEventHandler(private val controller: TheodoliteController) : ResourceEventHandler<KubernetesBenchmark> {
/**
* Add a KubernetesBenchmark.
......@@ -39,7 +40,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
......@@ -57,7 +58,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
......
......@@ -3,7 +3,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
......@@ -15,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController
* @see BenchmarkExecution
*/
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
class ExecutionHandler(private val controller: TheodoliteController) : ResourceEventHandler<BenchmarkExecution> {
/**
* Add an execution to the end of the queue of the TheodoliteController.
......@@ -29,17 +28,19 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
/**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue.
* Updates an execution. If this execution is running at the time this function is called, it is stopped and
* added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue.
*
* @param execution the execution to update
* @param oldExecution the old execution
* @param newExecution the new execution
*/
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
try {
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
} catch(e: NullPointerException) {
} catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
this.controller.executionsQueue.addFirst(newExecution)
......@@ -58,7 +59,7 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
logger.info { "Delete execution ${execution.metadata.name} from queue." }
} catch(e: NullPointerException) {
} catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
......
......@@ -2,7 +2,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import khttp.patch
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
......
......@@ -21,12 +21,12 @@ class K8sContextFactory {
*
* @see CustomResourceDefinitionContext
*/
fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext {
return CustomResourceDefinitionContext.Builder()
fun create(api: String, scope: String, group: String, plural: String): CustomResourceDefinitionContext {
return CustomResourceDefinitionContext.Builder()
.withVersion(api)
.withScope(scope)
.withGroup(group)
.withPlural(plural)
.build()
}
}
\ No newline at end of file
}
......@@ -32,7 +32,14 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
* @return CustomResource from fabric8
*/
private fun loadServiceMonitor(path: String): ServiceMonitorWrapper {
return loadGenericResource(path) { x: String -> ServiceMonitorWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) }
return loadGenericResource(path) { x: String ->
ServiceMonitorWrapper(
YamlParser().parse(
path,
HashMap<String, String>()::class.java
)!!
)
}
}
/**
......
......@@ -45,7 +45,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
"Topics creation finished with result: ${
result
.values()
.map { it -> it.key + ": " + it.value.isDone }
.map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
......@@ -59,16 +59,17 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin)
delete(currentTopics.filter { matchRegex(it, topics) }, kafkaAdmin)
kafkaAdmin.close()
}
/**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic`
* This function checks whether one string in `topics` can be used as prefix of a regular expression
* to create the string `existingTopic`.
*
* @param existingTopic string for which should be checked if it could be created
* @param topics list of string which are used as possible prefixes to create `existingTopic`
* @return true, `existingTopics` matches a created regex, else false
* @param existingTopic string for which should be checked if it could be created.
* @param topics list of string which are used as possible prefixes to create `existingTopic`.
* @return true, `existingTopics` matches a created regex, else false.
*/
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) {
......@@ -89,7 +90,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
result.all().get() // wait for the future to be completed
logger.info {
"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
result.values().map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
}"
}
......
......@@ -12,7 +12,7 @@ import io.fabric8.kubernetes.api.model.apps.StatefulSet
*/
class ImagePatcher(private val k8sResource: KubernetesResource, private val container: String) :
AbstractPatcher(k8sResource, container) {
override fun <String> patch(imagePath: String) {
if (k8sResource is Deployment) {
k8sResource.spec.template.spec.containers.filter { it.name == container }.forEach {
......