Skip to content
Snippets Groups Projects
Commit 3bc5cd4e authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb...

Merge branch 'theodolite-kotlin' of git.se.informatik.uni-kiel.de:she/spesb into feature/214-add-yaml-docs
parents ad49e7dd a10e11de
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!125Add dokumentation for benchmark and execution,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 81 additions and 54 deletions
...@@ -22,7 +22,7 @@ loadTypes: ...@@ -22,7 +22,7 @@ loadTypes:
- type: "NumSensorsLoadGeneratorReplicaPatcher" - type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml" resource: "uc1-load-generator-deployment.yaml"
kafkaConfig: kafkaConfig:
bootstrapServer: "theodolite-cp-kafka:9092" bootstrapServer: "localhost:31290"
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
......
...@@ -17,6 +17,7 @@ execution: ...@@ -17,6 +17,7 @@ execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 # in seconds duration: 300 # in seconds
repetitions: 1 repetitions: 1
loadGenerationDelay: 30 # in seconds, optional field, default is 0 seconds
restrictions: restrictions:
- "LowerBound" - "LowerBound"
configOverrides: [] configOverrides: []
......
...@@ -21,6 +21,7 @@ execution: ...@@ -21,6 +21,7 @@ execution:
strategy: "LinearSearch" strategy: "LinearSearch"
duration: 300 # in seconds duration: 300 # in seconds
repetitions: 1 repetitions: 1
delay: 30 # in seconds
restrictions: restrictions:
- "LowerBound" - "LowerBound"
configOverrides: [] configOverrides: []
......
...@@ -21,6 +21,7 @@ interface Benchmark { ...@@ -21,6 +21,7 @@ interface Benchmark {
fun buildDeployment( fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?> configurationOverrides: List<ConfigurationOverride?>,
delay: Long
): BenchmarkDeployment ): BenchmarkDeployment
} }
...@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced { ...@@ -47,6 +47,7 @@ class BenchmarkExecution : CustomResource(), Namespaced {
var duration by Delegates.notNull<Long>() var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>() var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String> lateinit var restrictions: List<String>
var loadGenerationDelay = 0L
} }
/** /**
......
...@@ -22,7 +22,7 @@ private var DEFAULT_NAMESPACE = "default" ...@@ -22,7 +22,7 @@ private var DEFAULT_NAMESPACE = "default"
* - [loadGenResource] resource that generates the load, * - [loadGenResource] resource that generates the load,
* - [resourceTypes] types of scaling resources, * - [resourceTypes] types of scaling resources,
* - [loadTypes] types of loads that can be scaled for the benchmark, * - [loadTypes] types of loads that can be scaled for the benchmark,
* - [kafkaConfig] for the [TopicManager], * - [kafkaConfig] for the [theodolite.k8s.TopicManager],
* - [namespace] for the client, * - [namespace] for the client,
* - [path] under which the resource yamls can be found. * - [path] under which the resource yamls can be found.
* *
...@@ -63,38 +63,43 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced { ...@@ -63,38 +63,43 @@ class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
* First loads all required resources and then patches them to the concrete load and resources for the experiment. * First loads all required resources and then patches them to the concrete load and resources for the experiment.
* Afterwards patches additional configurations(cluster depending) into the resources. * Afterwards patches additional configurations(cluster depending) into the resources.
* @param load concrete load that will be benchmarked in this experiment. * @param load concrete load that will be benchmarked in this experiment.
* @param res concrete resoruce that will be scaled for this experiment. * @param res concrete resource that will be scaled for this experiment.
* @param configurationOverrides * @param configurationOverrides
* @return a [BenchmarkDeployment] * @return a [BenchmarkDeployment]
*/ */
override fun buildDeployment( override fun buildDeployment(
load: LoadDimension, load: LoadDimension,
res: Resource, res: Resource,
configurationOverrides: List<ConfigurationOverride?> configurationOverrides: List<ConfigurationOverride?>,
loadGenerationDelay: Long
): BenchmarkDeployment { ): BenchmarkDeployment {
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
logger.info { "Using $path as resource path." } logger.info { "Using $path as resource path." }
val resources = loadKubernetesResources(this.appResource + this.loadGenResource) val appResources = loadKubernetesResources(this.appResource)
val loadGenResources = loadKubernetesResources(this.loadGenResource)
val patcherFactory = PatcherFactory() val patcherFactory = PatcherFactory()
// patch the load dimension the resources // patch the load dimension the resources
load.getType().forEach { patcherDefinition -> load.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(load.get().toString()) patcherFactory.createPatcher(patcherDefinition, loadGenResources).patch(load.get().toString())
} }
res.getType().forEach { patcherDefinition -> res.getType().forEach { patcherDefinition ->
patcherFactory.createPatcher(patcherDefinition, resources).patch(res.get().toString()) patcherFactory.createPatcher(patcherDefinition, appResources).patch(res.get().toString())
} }
// Patch the given overrides // Patch the given overrides
configurationOverrides.forEach { override -> configurationOverrides.forEach { override ->
override?.let { override?.let {
patcherFactory.createPatcher(it.patcher, resources).patch(override.value) patcherFactory.createPatcher(it.patcher, appResources + loadGenResources).patch(override.value)
} }
} }
return KubernetesBenchmarkDeployment( return KubernetesBenchmarkDeployment(
namespace = namespace, namespace = namespace,
resources = resources.map { r -> r.second }, appResources = appResources.map { it.second },
loadGenResources = loadGenResources.map { it.second },
loadGenerationDelay = loadGenerationDelay,
kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer), kafkaConfig = hashMapOf("bootstrap.servers" to kafkaConfig.bootstrapServer),
topics = kafkaConfig.topics, topics = kafkaConfig.topics,
client = DefaultKubernetesClient().inNamespace(namespace) client = DefaultKubernetesClient().inNamespace(namespace)
......
...@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic ...@@ -8,6 +8,7 @@ import org.apache.kafka.clients.admin.NewTopic
import theodolite.k8s.K8sManager import theodolite.k8s.K8sManager
import theodolite.k8s.TopicManager import theodolite.k8s.TopicManager
import theodolite.util.KafkaConfig import theodolite.util.KafkaConfig
import java.time.Duration
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {} ...@@ -22,7 +23,9 @@ private val logger = KotlinLogging.logger {}
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmarkDeployment( class KubernetesBenchmarkDeployment(
val namespace: String, val namespace: String,
val resources: List<KubernetesResource>, val appResources: List<KubernetesResource>,
val loadGenResources: List<KubernetesResource>,
private val loadGenerationDelay: Long,
private val kafkaConfig: HashMap<String, Any>, private val kafkaConfig: HashMap<String, Any>,
private val topics: List<KafkaConfig.TopicWrapper>, private val topics: List<KafkaConfig.TopicWrapper>,
private val client: NamespacedKubernetesClient private val client: NamespacedKubernetesClient
...@@ -39,9 +42,12 @@ class KubernetesBenchmarkDeployment( ...@@ -39,9 +42,12 @@ class KubernetesBenchmarkDeployment(
*/ */
override fun setup() { override fun setup() {
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map { NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics) kafkaController.createTopics(kafkaTopics)
resources.forEach { kubernetesManager.deploy(it) } appResources.forEach { kubernetesManager.deploy(it) }
logger.info { "Wait ${this.loadGenerationDelay} seconds before starting the load generator." }
Thread.sleep(Duration.ofSeconds(this.loadGenerationDelay).toMillis())
loadGenResources.forEach { kubernetesManager.deploy(it) }
} }
/** /**
...@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment( ...@@ -51,9 +57,8 @@ class KubernetesBenchmarkDeployment(
* - Remove the [KubernetesResource]s. * - Remove the [KubernetesResource]s.
*/ */
override fun teardown() { override fun teardown() {
resources.forEach { loadGenResources.forEach { kubernetesManager.remove(it) }
kubernetesManager.remove(it) appResources.forEach { kubernetesManager.remove(it) }
}
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." } logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
......
...@@ -24,9 +24,9 @@ class CsvExporter { ...@@ -24,9 +24,9 @@ class CsvExporter {
val csvOutputFile = File("$name.csv") val csvOutputFile = File("$name.csv")
PrintWriter(csvOutputFile).use { pw -> PrintWriter(csvOutputFile).use { pw ->
pw.println(listOf("group", "timestamp", "value").joinToString()) pw.println(listOf("group", "timestamp", "value").joinToString(separator=","))
responseArray.forEach { responseArray.forEach {
pw.println(it.joinToString()) pw.println(it.joinToString(separator=","))
} }
} }
logger.info { "Wrote CSV file: $name to ${csvOutputFile.absolutePath}." } logger.info { "Wrote CSV file: $name to ${csvOutputFile.absolutePath}." }
......
...@@ -4,7 +4,7 @@ import theodolite.util.PrometheusResponse ...@@ -4,7 +4,7 @@ import theodolite.util.PrometheusResponse
import java.time.Instant import java.time.Instant
/** /**
* A SloChecker can be used to evaluate data from Promehteus. * A SloChecker can be used to evaluate data from Prometheus.
* @constructor Creates an empty SloChecker * @constructor Creates an empty SloChecker
*/ */
interface SloChecker { interface SloChecker {
......
...@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor( ...@@ -24,9 +24,10 @@ abstract class BenchmarkExecutor(
val benchmark: Benchmark, val benchmark: Benchmark,
val results: Results, val results: Results,
val executionDuration: Duration, val executionDuration: Duration,
configurationOverrides: List<ConfigurationOverride?>, val configurationOverrides: List<ConfigurationOverride?>,
val slo: BenchmarkExecution.Slo, val slo: BenchmarkExecution.Slo,
val executionId: Int val executionId: Int,
val loadGenerationDelay: Long
) { ) {
var run: AtomicBoolean = AtomicBoolean(true) var run: AtomicBoolean = AtomicBoolean(true)
...@@ -41,7 +42,7 @@ abstract class BenchmarkExecutor( ...@@ -41,7 +42,7 @@ abstract class BenchmarkExecutor(
* given load, false otherwise. * given load, false otherwise.
*/ */
abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean abstract fun runExperiment(load: LoadDimension, res: Resource): Boolean
/** /**
* Wait while the benchmark is running and log the number of minutes executed every 1 minute. * Wait while the benchmark is running and log the number of minutes executed every 1 minute.
* *
......
...@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl( ...@@ -18,13 +18,14 @@ class BenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
results: Results, results: Results,
executionDuration: Duration, executionDuration: Duration,
private val configurationOverrides: List<ConfigurationOverride?>, configurationOverrides: List<ConfigurationOverride?>,
slo: BenchmarkExecution.Slo, slo: BenchmarkExecution.Slo,
executionId: Int executionId: Int,
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId) { loadGenerationDelay: Long
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo, executionId, loadGenerationDelay) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, configurationOverrides, loadGenerationDelay)
try { try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
......
...@@ -16,14 +16,14 @@ object Main { ...@@ -16,14 +16,14 @@ object Main {
val mode = System.getenv("MODE") ?: "standalone" val mode = System.getenv("MODE") ?: "standalone"
logger.info { "Start Theodolite with mode $mode" } logger.info { "Start Theodolite with mode $mode" }
when(mode) { when (mode) {
"standalone" -> TheodoliteYamlExecutor().start() "standalone" -> TheodoliteYamlExecutor().start()
"yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209) "yaml-executor" -> TheodoliteYamlExecutor().start() // TODO remove (#209)
"operator" -> TheodoliteOperator().start() "operator" -> TheodoliteOperator().start()
else -> { else -> {
logger.error { "MODE $mode not found" } logger.error { "MODE $mode not found" }
exitProcess(1) exitProcess(1)
} }
} }
} }
} }
\ No newline at end of file
...@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -30,7 +30,8 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
benchmark.buildDeployment( benchmark.buildDeployment(
load = LoadDimension(0, emptyList()), load = LoadDimension(0, emptyList()),
res = Resource(0, emptyList()), res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides configurationOverrides = benchmarkExecution.configOverrides,
loadGenerationDelay = 0L
) )
deployment.teardown() deployment.teardown()
} catch (e: Exception) { } catch (e: Exception) {
......
...@@ -71,7 +71,8 @@ class TheodoliteExecutor( ...@@ -71,7 +71,8 @@ class TheodoliteExecutor(
executionDuration = executionDuration, executionDuration = executionDuration,
configurationOverrides = config.configOverrides, configurationOverrides = config.configOverrides,
slo = config.slos[0], slo = config.slos[0],
executionId = config.executionId executionId = config.executionId,
loadGenerationDelay = config.execution.loadGenerationDelay
) )
return Config( return Config(
...@@ -128,7 +129,7 @@ class TheodoliteExecutor( ...@@ -128,7 +129,7 @@ class TheodoliteExecutor(
fun run() { fun run() {
val resultsFolder = getResultFolderString() val resultsFolder = getResultFolderString()
storeAsFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration") storeAsFile(this.config, "$resultsFolder${this.config.executionId}-execution-configuration")
storeAsFile(kubernetesBenchmark, "$resultsFolder/${this.config.executionId}-benchmark-configuration") storeAsFile(kubernetesBenchmark, "$resultsFolder${this.config.executionId}-benchmark-configuration")
val config = buildConfig() val config = buildConfig()
// execute benchmarks for each load // execute benchmarks for each load
......
...@@ -3,6 +3,7 @@ package theodolite.execution.operator ...@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
/** /**
...@@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {} ...@@ -13,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController * @see TheodoliteController
* @see KubernetesBenchmark * @see KubernetesBenchmark
*/ */
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> { class BenchmarkEventHandler(private val controller: TheodoliteController) : ResourceEventHandler<KubernetesBenchmark> {
/** /**
* Add a KubernetesBenchmark. * Add a KubernetesBenchmark.
...@@ -39,7 +40,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou ...@@ -39,7 +40,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) { override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." } logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) { if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true) this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false) this.controller.executor.executor.run.compareAndSet(true, false)
} else { } else {
...@@ -57,7 +58,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou ...@@ -57,7 +58,7 @@ class BenchmarkEventHandler(private val controller: TheodoliteController): Resou
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) { override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." } logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name) this.controller.benchmarks.remove(benchmark.metadata.name)
if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) { if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true) this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false) this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." } logger.info { "Current benchmark stopped." }
......
...@@ -3,7 +3,6 @@ package theodolite.execution.operator ...@@ -3,7 +3,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -15,7 +14,7 @@ private val logger = KotlinLogging.logger {} ...@@ -15,7 +14,7 @@ private val logger = KotlinLogging.logger {}
* @see TheodoliteController * @see TheodoliteController
* @see BenchmarkExecution * @see BenchmarkExecution
*/ */
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> { class ExecutionHandler(private val controller: TheodoliteController) : ResourceEventHandler<BenchmarkExecution> {
/** /**
* Add an execution to the end of the queue of the TheodoliteController. * Add an execution to the end of the queue of the TheodoliteController.
...@@ -29,17 +28,19 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv ...@@ -29,17 +28,19 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
} }
/** /**
* Update an execution. If this execution is running at the time this function is called, it is stopped and added to * Updates an execution. If this execution is running at the time this function is called, it is stopped and
* the beginning of the queue of the TheodoliteController. Otherwise, it is just added to the beginning of the queue. * added to the beginning of the queue of the TheodoliteController.
* Otherwise, it is just added to the beginning of the queue.
* *
* @param execution the execution to update * @param oldExecution the old execution
* @param newExecution the new execution
*/ */
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) { override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." } logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name newExecution.name = newExecution.metadata.name
try { try {
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name } this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
} catch(e: NullPointerException) { } catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" } logger.warn { "No execution found for deletion" }
} }
this.controller.executionsQueue.addFirst(newExecution) this.controller.executionsQueue.addFirst(newExecution)
...@@ -58,7 +59,7 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv ...@@ -58,7 +59,7 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
try { try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name } this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
logger.info { "Delete execution ${execution.metadata.name} from queue." } logger.info { "Delete execution ${execution.metadata.name} from queue." }
} catch(e: NullPointerException) { } catch (e: NullPointerException) {
logger.warn { "No execution found for deletion" } logger.warn { "No execution found for deletion" }
} }
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) { if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
......
...@@ -2,7 +2,6 @@ package theodolite.execution.operator ...@@ -2,7 +2,6 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import khttp.patch
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
......
...@@ -21,12 +21,12 @@ class K8sContextFactory { ...@@ -21,12 +21,12 @@ class K8sContextFactory {
* *
* @see CustomResourceDefinitionContext * @see CustomResourceDefinitionContext
*/ */
fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext { fun create(api: String, scope: String, group: String, plural: String): CustomResourceDefinitionContext {
return CustomResourceDefinitionContext.Builder() return CustomResourceDefinitionContext.Builder()
.withVersion(api) .withVersion(api)
.withScope(scope) .withScope(scope)
.withGroup(group) .withGroup(group)
.withPlural(plural) .withPlural(plural)
.build() .build()
} }
} }
\ No newline at end of file
...@@ -32,7 +32,14 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) { ...@@ -32,7 +32,14 @@ class K8sResourceLoader(private val client: NamespacedKubernetesClient) {
* @return CustomResource from fabric8 * @return CustomResource from fabric8
*/ */
private fun loadServiceMonitor(path: String): ServiceMonitorWrapper { private fun loadServiceMonitor(path: String): ServiceMonitorWrapper {
return loadGenericResource(path) { x: String -> ServiceMonitorWrapper(YamlParser().parse(path, HashMap<String, String>()::class.java)!!) } return loadGenericResource(path) { x: String ->
ServiceMonitorWrapper(
YamlParser().parse(
path,
HashMap<String, String>()::class.java
)!!
)
}
} }
/** /**
......
...@@ -45,7 +45,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -45,7 +45,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
"Topics creation finished with result: ${ "Topics creation finished with result: ${
result result
.values() .values()
.map { it -> it.key + ": " + it.value.isDone } .map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
} " } "
} }
...@@ -59,16 +59,17 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -59,16 +59,17 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val currentTopics = kafkaAdmin.listTopics().names().get() val currentTopics = kafkaAdmin.listTopics().names().get()
delete(currentTopics.filter{ matchRegex(it, topics) }, kafkaAdmin) delete(currentTopics.filter { matchRegex(it, topics) }, kafkaAdmin)
kafkaAdmin.close() kafkaAdmin.close()
} }
/** /**
* This function checks whether one string in `topics` can be used as prefix of a regular expression to create the string `existingTopic` * This function checks whether one string in `topics` can be used as prefix of a regular expression
* to create the string `existingTopic`.
* *
* @param existingTopic string for which should be checked if it could be created * @param existingTopic string for which should be checked if it could be created.
* @param topics list of string which are used as possible prefixes to create `existingTopic` * @param topics list of string which are used as possible prefixes to create `existingTopic`.
* @return true, `existingTopics` matches a created regex, else false * @return true, `existingTopics` matches a created regex, else false.
*/ */
private fun matchRegex(existingTopic: String, topics: List<String>): Boolean { private fun matchRegex(existingTopic: String, topics: List<String>): Boolean {
for (t in topics) { for (t in topics) {
...@@ -89,7 +90,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -89,7 +90,7 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
result.all().get() // wait for the future to be completed result.all().get() // wait for the future to be completed
logger.info { logger.info {
"Topics deletion finished with result: ${ "Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone } result.values().map { it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
}" }"
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment