Skip to content
Snippets Groups Projects
Commit b1b334e7 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'stu203404/spesb-180-Introduce-Helm-Chart' into start-using-defaults-issue-191

parents c534fb03 42313ec6
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!115Start using new Theodolite defaults,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2649 failed
Showing
with 372 additions and 86 deletions
name: "Theodolite Test Context" name: example-execution
benchmark: "uc1-kstreams" benchmark: "uc1-kstreams"
load: load:
loadType: "NumSensors" loadType: "NumSensors"
...@@ -32,19 +32,19 @@ configOverrides: ...@@ -32,19 +32,19 @@ configOverrides:
resource: "uc1-kstreams-deployment.yaml" resource: "uc1-kstreams-deployment.yaml"
variableName: "env" variableName: "env"
value: "prod" value: "prod"
- patcher: # - patcher:
type: "ResourceLimitPatcher" # type: "ResourceLimitPatcher"
resource: "uc1-kstreams-deployment.yaml" # resource: "uc1-kstreams-deployment.yaml"
container: "uc-application" # container: "uc-application"
variableName: "cpu" # variableName: "cpu"
value: "1000m" # value: "1000m"
- patcher: # - patcher:
type: "ResourceLimitPatcher" # type: "ResourceLimitPatcher"
resource: "uc1-kstreams-deployment.yaml" # resource: "uc1-kstreams-deployment.yaml"
container: "uc-application" # container: "uc-application"
variableName: "memory" # variableName: "memory"
value: "2Gi" # value: "2Gi"
- patcher: # - patcher:
type: "SchedulerNamePatcher" # type: "SchedulerNamePatcher"
resource: "uc1-kstreams-deployment.yaml" # resource: "uc1-kstreams-deployment.yaml"
value: "random-scheduler" # value: "random-scheduler"
\ No newline at end of file
package theodolite.benchmark package theodolite.benchmark
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import theodolite.util.ConfigurationOverride import theodolite.util.ConfigurationOverride
import kotlin.properties.Delegates import kotlin.properties.Delegates
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class BenchmarkExecution { class BenchmarkExecution : CustomResource(), Namespaced {
lateinit var name: String lateinit var name: String
lateinit var benchmark: String lateinit var benchmark: String
lateinit var load: LoadDefinition lateinit var load: LoadDefinition
...@@ -14,16 +19,18 @@ class BenchmarkExecution { ...@@ -14,16 +19,18 @@ class BenchmarkExecution {
lateinit var execution: Execution lateinit var execution: Execution
lateinit var configOverrides: List<ConfigurationOverride?> lateinit var configOverrides: List<ConfigurationOverride?>
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class Execution { class Execution : KubernetesResource {
lateinit var strategy: String lateinit var strategy: String
var duration by Delegates.notNull<Long>() var duration by Delegates.notNull<Long>()
var repetitions by Delegates.notNull<Int>() var repetitions by Delegates.notNull<Int>()
lateinit var restrictions: List<String> lateinit var restrictions: List<String>
} }
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class Slo { class Slo : KubernetesResource {
lateinit var sloType: String lateinit var sloType: String
var threshold by Delegates.notNull<Int>() var threshold by Delegates.notNull<Int>()
lateinit var prometheusUrl: String lateinit var prometheusUrl: String
...@@ -32,14 +39,17 @@ class BenchmarkExecution { ...@@ -32,14 +39,17 @@ class BenchmarkExecution {
var warmup by Delegates.notNull<Int>() var warmup by Delegates.notNull<Int>()
} }
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class LoadDefinition { class LoadDefinition : KubernetesResource {
lateinit var loadType: String lateinit var loadType: String
lateinit var loadValues: List<Int> lateinit var loadValues: List<Int>
} }
@JsonDeserialize
@RegisterForReflection @RegisterForReflection
class ResourceDefinition { class ResourceDefinition : KubernetesResource {
lateinit var resourceType: String lateinit var resourceType: String
lateinit var resourceValues: List<Int> lateinit var resourceValues: List<Int>
} }
......
package theodolite.benchmark
import io.fabric8.kubernetes.client.CustomResourceList
class BenchmarkExecutionList : CustomResourceList<BenchmarkExecution>()
\ No newline at end of file
package theodolite.benchmark; package theodolite.benchmark
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import mu.KotlinLogging import mu.KotlinLogging
......
package theodolite.benchmark package theodolite.benchmark
import io.fabric8.kubernetes.api.model.KubernetesResource import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.Namespaced
import io.fabric8.kubernetes.client.CustomResource
import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging import mu.KotlinLogging
...@@ -9,11 +11,10 @@ import theodolite.patcher.PatcherFactory ...@@ -9,11 +11,10 @@ import theodolite.patcher.PatcherFactory
import theodolite.util.* import theodolite.util.*
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private var DEFAULT_NAMESPACE = "default" private var DEFAULT_NAMESPACE = "default"
@RegisterForReflection @RegisterForReflection
class KubernetesBenchmark : Benchmark { class KubernetesBenchmark : Benchmark, CustomResource(), Namespaced {
lateinit var name: String lateinit var name: String
lateinit var appResource: List<String> lateinit var appResource: List<String>
lateinit var loadGenResource: List<String> lateinit var loadGenResource: List<String>
......
package theodolite.benchmark
import io.fabric8.kubernetes.client.CustomResourceList
class KubernetesBenchmarkList : CustomResourceList<KubernetesBenchmark>()
\ No newline at end of file
...@@ -16,7 +16,7 @@ class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) { ...@@ -16,7 +16,7 @@ class AnalysisExecutor(private val slo: BenchmarkExecution.Slo) {
offset = Duration.ofHours(slo.offset.toLong()) offset = Duration.ofHours(slo.offset.toLong())
) )
fun analyse(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean { fun analyze(load: LoadDimension, res: Resource, executionDuration: Duration): Boolean {
var result = false var result = false
try { try {
......
...@@ -8,6 +8,7 @@ import theodolite.util.LoadDimension ...@@ -8,6 +8,7 @@ import theodolite.util.LoadDimension
import theodolite.util.Resource import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.time.Duration import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -27,6 +28,8 @@ abstract class BenchmarkExecutor( ...@@ -27,6 +28,8 @@ abstract class BenchmarkExecutor(
val slo: BenchmarkExecution.Slo val slo: BenchmarkExecution.Slo
) { ) {
var run: AtomicBoolean = AtomicBoolean(true)
/** /**
* Run a experiment for the given parametrization, evaluate the experiment and save the result. * Run a experiment for the given parametrization, evaluate the experiment and save the result.
* *
...@@ -42,12 +45,19 @@ abstract class BenchmarkExecutor( ...@@ -42,12 +45,19 @@ abstract class BenchmarkExecutor(
*/ */
fun waitAndLog() { fun waitAndLog() {
logger.info { "Execution of a new benchmark started." } logger.info { "Execution of a new benchmark started." }
for (i in 1.rangeTo(executionDuration.toSeconds())) {
var secondsRunning = 0L
while (run.get() && secondsRunning < executionDuration.toSeconds()) {
secondsRunning++
Thread.sleep(Duration.ofSeconds(1).toMillis()) Thread.sleep(Duration.ofSeconds(1).toMillis())
if ((i % 60) == 0L) {
logger.info { "Executed: ${i / 60} minutes" } if ((secondsRunning % 60) == 0L) {
logger.info { "Executed: ${secondsRunning / 60} minutes." }
} }
} }
logger.debug { "Executor shutdown gracefully." }
} }
} }
...@@ -22,17 +22,24 @@ class BenchmarkExecutorImpl( ...@@ -22,17 +22,24 @@ class BenchmarkExecutorImpl(
slo: BenchmarkExecution.Slo slo: BenchmarkExecution.Slo
) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) { ) : BenchmarkExecutor(benchmark, results, executionDuration, configurationOverrides, slo) {
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
} catch(e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" }
this.run.set(false)
}
val result = AnalysisExecutor(slo = slo).analyse(load = load, res = res, executionDuration = executionDuration) if (this.run.get()) {
result =
benchmarkDeployment.teardown() AnalysisExecutor(slo = slo).analyze(load = load, res = res, executionDuration = executionDuration)
benchmarkDeployment.teardown()
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
}
benchmarkDeployment.teardown()
return result return result
} }
} }
package theodolite.execution
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging
import theodolite.execution.operator.TheodoliteOperator
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {}
@QuarkusMain
object Main {
@JvmStatic
fun main(args: Array<String>) {
val mode = System.getenv("MODE") ?: "operator"
logger.info { "Start Theodolite with mode $mode" }
when(mode) {
"yaml-executor" -> TheodoliteYamlExecutor().start()
"operator" -> TheodoliteOperator().start()
else -> {logger.error { "MODE $mode not found" }; exitProcess(1)}
}
}
}
\ No newline at end of file
...@@ -8,10 +8,9 @@ import theodolite.util.Resource ...@@ -8,10 +8,9 @@ import theodolite.util.Resource
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) : class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val benchmark: KubernetesBenchmark) {
Thread() {
override fun run() { fun run() {
// Build Configuration to teardown // Build Configuration to teardown
logger.info { "Received shutdown signal -> Shutting down" } logger.info { "Received shutdown signal -> Shutting down" }
val deployment = val deployment =
...@@ -20,7 +19,7 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b ...@@ -20,7 +19,7 @@ class Shutdown(private val benchmarkExecution: BenchmarkExecution, private val b
res = Resource(0, emptyList()), res = Resource(0, emptyList()),
configurationOverrides = benchmarkExecution.configOverrides configurationOverrides = benchmarkExecution.configOverrides
) )
logger.info { "Teardown the everything deployed" } logger.info { "Teardown everything deployed" }
deployment.teardown() deployment.teardown()
logger.info { "Teardown completed" } logger.info { "Teardown completed" }
} }
......
...@@ -15,6 +15,7 @@ class TheodoliteExecutor( ...@@ -15,6 +15,7 @@ class TheodoliteExecutor(
private val config: BenchmarkExecution, private val config: BenchmarkExecution,
private val kubernetesBenchmark: KubernetesBenchmark private val kubernetesBenchmark: KubernetesBenchmark
) { ) {
lateinit var executor: BenchmarkExecutor
private fun buildConfig(): Config { private fun buildConfig(): Config {
val results = Results() val results = Results()
...@@ -22,14 +23,19 @@ class TheodoliteExecutor( ...@@ -22,14 +23,19 @@ class TheodoliteExecutor(
val executionDuration = Duration.ofSeconds(config.execution.duration) val executionDuration = Duration.ofSeconds(config.execution.duration)
val resourcePatcherDefinition = PatcherDefinitionFactory().createPatcherDefinition( val resourcePatcherDefinition =
PatcherDefinitionFactory().createPatcherDefinition(
config.resources.resourceType, config.resources.resourceType,
this.kubernetesBenchmark.resourceTypes this.kubernetesBenchmark.resourceTypes
) )
val loadDimensionPatcherDefinition = val loadDimensionPatcherDefinition =
PatcherDefinitionFactory().createPatcherDefinition(config.load.loadType, this.kubernetesBenchmark.loadTypes) PatcherDefinitionFactory().createPatcherDefinition(
config.load.loadType,
this.kubernetesBenchmark.loadTypes
)
val executor = executor =
BenchmarkExecutorImpl( BenchmarkExecutorImpl(
kubernetesBenchmark, kubernetesBenchmark,
results, results,
...@@ -57,12 +63,21 @@ class TheodoliteExecutor( ...@@ -57,12 +63,21 @@ class TheodoliteExecutor(
) )
} }
fun getExecution(): BenchmarkExecution {
return this.config
}
fun getBenchmark(): KubernetesBenchmark {
return this.kubernetesBenchmark
}
fun run() { fun run() {
val config = buildConfig() val config = buildConfig()
// execute benchmarks for each load // execute benchmarks for each load
for (load in config.loads) { for (load in config.loads) {
if (executor.run.get()) {
config.compositeStrategy.findSuitableResource(load, config.resources) config.compositeStrategy.findSuitableResource(load, config.resources)
} }
} }
} }
}
package theodolite.execution package theodolite.execution
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.util.YamlParser import theodolite.util.YamlParser
import kotlin.concurrent.thread
import kotlin.system.exitProcess import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@QuarkusMain(name = "TheodoliteYamlExecutor") class TheodoliteYamlExecutor {
object TheodoliteYamlExecutor { private val parser = YamlParser()
@JvmStatic
fun main(args: Array<String>) { fun start() {
logger.info { "Theodolite started" } logger.info { "Theodolite started" }
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml" val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml"
...@@ -23,19 +23,18 @@ object TheodoliteYamlExecutor { ...@@ -23,19 +23,18 @@ object TheodoliteYamlExecutor {
// load the BenchmarkExecution and the BenchmarkType // load the BenchmarkExecution and the BenchmarkType
val parser = YamlParser()
val benchmarkExecution = val benchmarkExecution =
parser.parse(path = executionPath, E = BenchmarkExecution::class.java)!! parser.parse(path = executionPath, E = BenchmarkExecution::class.java)!!
val benchmark = val benchmark =
parser.parse(path = benchmarkPath, E = KubernetesBenchmark::class.java)!! parser.parse(path = benchmarkPath, E = KubernetesBenchmark::class.java)!!
val shutdown = Shutdown(benchmarkExecution, benchmark) val shutdown = Shutdown(benchmarkExecution, benchmark)
Runtime.getRuntime().addShutdownHook(shutdown) Runtime.getRuntime().addShutdownHook(thread { shutdown.run()})
val executor = TheodoliteExecutor(benchmarkExecution, benchmark) val executor = TheodoliteExecutor(benchmarkExecution, benchmark)
executor.run() executor.run()
logger.info { "Theodolite finished" } logger.info { "Theodolite finished" }
Runtime.getRuntime().removeShutdownHook(shutdown) Runtime.getRuntime().removeShutdownHook(thread { shutdown.run()})
exitProcess(0) exitProcess(0)
} }
} }
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark
}
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
onAdd(newBenchmark)
}
}
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
private val logger = KotlinLogging.logger {}
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
override fun onAdd(execution: BenchmarkExecution) {
execution.name = execution.metadata.name
logger.info { "Add new execution ${execution.metadata.name} to queue." }
this.controller.executionsQueue.add(execution)
}
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
}
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue." }
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
class K8sContextFactory {
fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext{
return CustomResourceDefinitionContext.Builder()
.withVersion(api)
.withScope(scope)
.withGroup(group)
.withPlural(plural)
.build()
}
}
\ No newline at end of file
package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark
import theodolite.execution.TheodoliteExecutor
import java.lang.Thread.sleep
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicBoolean
private val logger = KotlinLogging.logger {}
class TheodoliteController(
val client: NamespacedKubernetesClient,
val executionContext: CustomResourceDefinitionContext
) {
lateinit var executor: TheodoliteExecutor
val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque()
val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap()
var isUpdated = AtomicBoolean(false)
fun run() {
while (true) {
try {
reconcile()
logger.info { "Theodolite is waiting for new matching benchmark and execution." }
logger.info { "Currently available executions: " }
executionsQueue.forEach {
logger.info { "${it.name} : waiting for : ${it.benchmark}" }
}
logger.info { "Currently available benchmarks: " }
benchmarks.forEach {
logger.info { it.key }
}
sleep(2000)
} catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e." }
}
}
}
@Synchronized
private fun reconcile() {
while (executionsQueue.isNotEmpty()) {
val execution = executionsQueue.peek()
val benchmark = benchmarks[execution.benchmark]
if (benchmark == null) {
logger.debug { "No benchmark found for execution ${execution.name}." }
sleep(1000)
} else {
runExecution(execution, benchmark)
}
}
}
@Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
isUpdated.set(false)
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." }
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run()
if (!isUpdated.get()) {
client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
}
logger.info { "Execution of ${execution.name} is finally stopped." }
}
@Synchronized
fun isInitialized(): Boolean {
return ::executor.isInitialized
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.internal.KubernetesDeserializer
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.BenchmarkExecutionList
import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.KubernetesBenchmarkList
private const val DEFAULT_NAMESPACE = "default"
private const val SCOPE = "Namespaced"
private const val EXECUTION_SINGULAR = "execution"
private const val EXECUTION_PLURAL = "executions"
private const val BENCHMARK_SINGULAR = "benchmark"
private const val BENCHMARK_PLURAL = "benchmarks"
private const val API_VERSION = "v1alpha1"
private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong()
private const val GROUP = "theodolite.com"
private val logger = KotlinLogging.logger {}
class TheodoliteOperator {
private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
fun start() {
logger.info { "Using $namespace as namespace." }
val client = DefaultKubernetesClient().inNamespace(namespace)
KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION",
EXECUTION_SINGULAR,
BenchmarkExecution::class.java
)
KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION",
BENCHMARK_SINGULAR,
KubernetesBenchmark::class.java
)
val contextFactory = K8sContextFactory()
val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL)
val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL)
val controller = TheodoliteController(client = client, executionContext = executionContext)
val informerFactory = client.informers()
val informerExecution = informerFactory.sharedIndexInformerForCustomResource(
executionContext, BenchmarkExecution::class.java,
BenchmarkExecutionList::class.java, RESYNC_PERIOD
)
val informerBenchmark = informerFactory.sharedIndexInformerForCustomResource(
benchmarkContext, KubernetesBenchmark::class.java,
KubernetesBenchmarkList::class.java, RESYNC_PERIOD
)
informerExecution.addEventHandler(ExecutionHandler(controller))
informerBenchmark.addEventHandler(BenchmarkEventHandler(controller))
informerFactory.startAllRegisteredInformers()
controller.run()
}
}
...@@ -12,52 +12,35 @@ private val logger = KotlinLogging.logger {} ...@@ -12,52 +12,35 @@ private val logger = KotlinLogging.logger {}
* Manages the topics related tasks * Manages the topics related tasks
* @param kafkaConfig Kafka Configuration as HashMap * @param kafkaConfig Kafka Configuration as HashMap
*/ */
class TopicManager(kafkaConfig: HashMap<String, Any>) { class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
private lateinit var kafkaAdmin: AdminClient
init {
try {
kafkaAdmin = AdminClient.create(kafkaConfig)
} catch (e: Exception) {
logger.error { e.toString() }
}
}
/** /**
* Creates topics. * Creates topics.
* @param newTopics List of all Topic which should be created * @param newTopics List of all Topic which should be created
*/ */
fun createTopics(newTopics: Collection<NewTopic>) { fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
kafkaAdmin.createTopics(newTopics) kafkaAdmin.createTopics(newTopics)
kafkaAdmin.close()
logger.info { "Topics created" } logger.info { "Topics created" }
} }
fun createTopics(topics: List<String>, numPartitions: Int, replicationFactor: Short) {
val newTopics = mutableSetOf<NewTopic>()
for (i in topics) {
val tops = NewTopic(i, numPartitions, replicationFactor)
newTopics.add(tops)
}
kafkaAdmin.createTopics(newTopics)
logger.info { "Creation of $topics started" }
}
/** /**
* Removes topics. * Removes topics.
* @param topics * @param topics
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.deleteTopics(topics) val result = kafkaAdmin.deleteTopics(topics)
try { try {
result.all().get() result.all().get()
} catch (ex: Exception) { } catch (e: Exception) {
logger.error { ex.toString() } logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
} }
kafkaAdmin.close()
logger.info { "Topics removed" } logger.info { "Topics removed" }
} }
fun getTopics(): ListTopicsResult? {
return kafkaAdmin.listTopics()
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment