Skip to content
Snippets Groups Projects
Commit f6c3da1e authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

Refactoring and clean up

Split Theodolite operator for more decoupling
clean up
Enhande handling of concurrency
Enhance error handling
parent 10bf8211
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!106Introduce a Theodolite operator,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 277 additions and 35 deletions
package theodolite.execution package theodolite.execution
import io.quarkus.runtime.annotations.RegisterForReflection import io.quarkus.runtime.annotations.RegisterForReflection
import mu.KotlinLogging
import theodolite.benchmark.Benchmark import theodolite.benchmark.Benchmark
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.evaluation.AnalysisExecutor import theodolite.evaluation.AnalysisExecutor
...@@ -10,6 +11,8 @@ import theodolite.util.Resource ...@@ -10,6 +11,8 @@ import theodolite.util.Resource
import theodolite.util.Results import theodolite.util.Results
import java.time.Duration import java.time.Duration
private val logger = KotlinLogging.logger {}
@RegisterForReflection @RegisterForReflection
class BenchmarkExecutorImpl( class BenchmarkExecutorImpl(
benchmark: Benchmark, benchmark: Benchmark,
...@@ -21,8 +24,15 @@ class BenchmarkExecutorImpl( ...@@ -21,8 +24,15 @@ class BenchmarkExecutorImpl(
override fun runExperiment(load: LoadDimension, res: Resource): Boolean { override fun runExperiment(load: LoadDimension, res: Resource): Boolean {
var result = false var result = false
val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides) val benchmarkDeployment = benchmark.buildDeployment(load, res, this.configurationOverrides)
try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
} catch(e: Exception) {
logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" }
this.run.set(false)
}
if (this.run.get()) { if (this.run.get()) {
result = result =
... ...
......
package theodolite.execution
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging
import theodolite.execution.operator.TheodoliteOperator
import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {}
@QuarkusMain
object Main {
@JvmStatic
fun main(args: Array<String>) {
val mode = System.getenv("MODE") ?: "operator"
logger.info { "Start Theodolite with mode $mode" }
when(mode) {
"yaml-executor" -> TheodoliteYamlExecutor().start()
"operator" -> TheodoliteOperator().start()
else -> {logger.error { "MODE $mode not found" }; exitProcess(1)}
}
}
}
\ No newline at end of file
package theodolite.execution package theodolite.execution
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
...@@ -10,10 +9,10 @@ import kotlin.system.exitProcess ...@@ -10,10 +9,10 @@ import kotlin.system.exitProcess
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@QuarkusMain(name = "TheodoliteYamlExecutor") class TheodoliteYamlExecutor {
object TheodoliteYamlExecutor { private val parser = YamlParser()
@JvmStatic
fun main(args: Array<String>) { fun start() {
logger.info { "Theodolite started" } logger.info { "Theodolite started" }
val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml" val executionPath = System.getenv("THEODOLITE_EXECUTION") ?: "./config/BenchmarkExecution.yaml"
...@@ -26,7 +25,6 @@ object TheodoliteYamlExecutor { ...@@ -26,7 +25,6 @@ object TheodoliteYamlExecutor {
// load the BenchmarkExecution and the BenchmarkType // load the BenchmarkExecution and the BenchmarkType
val parser = YamlParser()
val benchmarkExecution = val benchmarkExecution =
parser.parse(path = executionPath, E = BenchmarkExecution::class.java)!! parser.parse(path = executionPath, E = BenchmarkExecution::class.java)!!
val benchmark = val benchmark =
... ...
......
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.KubernetesBenchmark
private val logger = KotlinLogging.logger {}
class BenchmarkEventHandler(private val controller: TheodoliteController): ResourceEventHandler<KubernetesBenchmark> {
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}." }
this.controller.benchmarks[benchmark.name] = benchmark
}
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}." }
newBenchmark.name = newBenchmark.metadata.name
if (this.controller.isInitialized() && this.controller.executor.getBenchmark().name == oldBenchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
} else {
onAdd(newBenchmark)
}
}
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}." }
this.controller.benchmarks.remove(benchmark.metadata.name)
if ( this.controller.isInitialized() && this.controller.executor.getBenchmark().name == benchmark.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
private val logger = KotlinLogging.logger {}
class ExecutionHandler(private val controller: TheodoliteController): ResourceEventHandler<BenchmarkExecution> {
override fun onAdd(execution: BenchmarkExecution) {
execution.name = execution.metadata.name
logger.info { "Add new execution ${execution.metadata.name} to queue." }
this.controller.executionsQueue.add(execution)
}
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
}
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue." }
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped." }
}
}
}
package theodolite.execution.operator
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
class K8sContextFactory {
fun create(api: String, scope: String, group: String, plural: String ) : CustomResourceDefinitionContext{
return CustomResourceDefinitionContext.Builder()
.withVersion(api)
.withScope(scope)
.withGroup(group)
.withPlural(plural)
.build()
}
}
\ No newline at end of file
package theodolite.execution package theodolite.execution.operator
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import io.fabric8.kubernetes.client.informers.SharedInformer
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.KubernetesBenchmark import theodolite.benchmark.KubernetesBenchmark
import theodolite.execution.TheodoliteExecutor
import java.lang.Thread.sleep import java.lang.Thread.sleep
import java.util.* import java.util.concurrent.ConcurrentHashMap
import kotlin.collections.HashMap import java.util.concurrent.ConcurrentLinkedDeque
import kotlin.collections.set import java.util.concurrent.atomic.AtomicBoolean
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class TheodoliteController( class TheodoliteController(
val client: NamespacedKubernetesClient, val client: NamespacedKubernetesClient,
val informerBenchmarkExecution: SharedInformer<BenchmarkExecution>,
val informerBenchmarkType: SharedInformer<KubernetesBenchmark>,
val executionContext: CustomResourceDefinitionContext val executionContext: CustomResourceDefinitionContext
) { ) {
lateinit var executor: TheodoliteExecutor lateinit var executor: TheodoliteExecutor
val executionsQueue: Deque<BenchmarkExecution> = LinkedList<BenchmarkExecution>() val executionsQueue: ConcurrentLinkedDeque<BenchmarkExecution> = ConcurrentLinkedDeque()
val benchmarks: MutableMap<String, KubernetesBenchmark> = HashMap() val benchmarks: ConcurrentHashMap<String, KubernetesBenchmark> = ConcurrentHashMap()
var isUpdated = false var isUpdated = AtomicBoolean(false)
/**
* Adds the EventHandler to kubernetes
*/
fun create() {
informerBenchmarkExecution.addEventHandler(object : ResourceEventHandler<BenchmarkExecution> {
override fun onAdd(execution: BenchmarkExecution) {
execution.name = execution.metadata.name
logger.info { "Add new execution ${execution.metadata.name} to queue" }
executionsQueue.add(execution)
}
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue" }
newExecution.name = newExecution.metadata.name
executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
executionsQueue.addFirst(newExecution)
if (::executor.isInitialized && executor.getExecution().name == newExecution.metadata.name) {
isUpdated = true
executor.executor.run.compareAndSet(true, false)
}
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue" }
executionsQueue.removeIf { e -> e.name == execution.metadata.name }
if (::executor.isInitialized && executor.getExecution().name == execution.metadata.name) {
isUpdated = true
executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped" }
}
}
})
informerBenchmarkType.addEventHandler(object : ResourceEventHandler<KubernetesBenchmark> {
override fun onAdd(benchmark: KubernetesBenchmark) {
benchmark.name = benchmark.metadata.name
logger.info { "Add new benchmark ${benchmark.name}" }
benchmarks[benchmark.name] = benchmark
}
override fun onUpdate(oldBenchmark: KubernetesBenchmark, newBenchmark: KubernetesBenchmark) {
logger.info { "Update benchmark ${newBenchmark.metadata.name}" }
newBenchmark.name = newBenchmark.metadata.name
if (::executor.isInitialized && executor.getBenchmark().name == oldBenchmark.metadata.name) {
isUpdated = true
executor.executor.run.compareAndSet(true, false)
} else {
onAdd(newBenchmark)
}
}
override fun onDelete(benchmark: KubernetesBenchmark, b: Boolean) {
logger.info { "Delete benchmark ${benchmark.metadata.name}" }
benchmarks.remove(benchmark.metadata.name)
if (::executor.isInitialized && executor.getBenchmark().name == benchmark.metadata.name) {
isUpdated = true
executor.executor.run.compareAndSet(true, false)
logger.info { "Current benchmark stopped" }
}
}
})
}
fun run() { fun run() {
while (true) { while (true) {
try { try {
reconcile() reconcile()
logger.info { "Theodolite is waiting for new matching benchmark and execution" } logger.info { "Theodolite is waiting for new matching benchmark and execution." }
logger.info { "Currently available executions: " } logger.info { "Currently available executions: " }
executionsQueue.forEach { executionsQueue.forEach {
logger.info { "${it.name} : waiting for : ${it.benchmark}" } logger.info { "${it.name} : waiting for : ${it.benchmark}" }
...@@ -107,7 +37,7 @@ class TheodoliteController( ...@@ -107,7 +37,7 @@ class TheodoliteController(
} }
sleep(2000) sleep(2000)
} catch (e: InterruptedException) { } catch (e: InterruptedException) {
logger.error { "Execution interrupted with error: $e" } logger.error { "Execution interrupted with error: $e." }
} }
} }
} }
...@@ -115,13 +45,11 @@ class TheodoliteController( ...@@ -115,13 +45,11 @@ class TheodoliteController(
@Synchronized @Synchronized
private fun reconcile() { private fun reconcile() {
while (executionsQueue.isNotEmpty()) { while (executionsQueue.isNotEmpty()) {
val execution = executionsQueue.peek() val execution = executionsQueue.peek()
val benchmark = benchmarks[execution.benchmark] val benchmark = benchmarks[execution.benchmark]
if (benchmark == null) { if (benchmark == null) {
logger.debug { "No benchmark found for execution ${execution.name}" } logger.debug { "No benchmark found for execution ${execution.name}." }
sleep(1000) sleep(1000)
} else { } else {
runExecution(execution, benchmark) runExecution(execution, benchmark)
...@@ -131,15 +59,19 @@ class TheodoliteController( ...@@ -131,15 +59,19 @@ class TheodoliteController(
@Synchronized @Synchronized
fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) { fun runExecution(execution: BenchmarkExecution, benchmark: KubernetesBenchmark) {
isUpdated = false isUpdated.set(false)
logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}" } logger.info { "Start execution ${execution.name} with benchmark ${benchmark.name}." }
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark) executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run() executor.run()
if (!isUpdated) { if (!isUpdated.get()) {
client.customResource(executionContext).delete(client.namespace, execution.metadata.name) client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
} }
logger.info { "Execution of ${execution.name} is finally stopped." }
}
logger.info { "Execution of ${execution.name} is finally stopped" } @Synchronized
fun isInitialized(): Boolean {
return ::executor.isInitialized
} }
} }
package theodolite.execution package theodolite.execution.operator
import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import io.fabric8.kubernetes.internal.KubernetesDeserializer import io.fabric8.kubernetes.internal.KubernetesDeserializer
import io.quarkus.runtime.annotations.QuarkusMain
import mu.KotlinLogging import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution import theodolite.benchmark.BenchmarkExecution
import theodolite.benchmark.BenchmarkExecutionList import theodolite.benchmark.BenchmarkExecutionList
...@@ -11,66 +9,56 @@ import theodolite.benchmark.KubernetesBenchmark ...@@ -11,66 +9,56 @@ import theodolite.benchmark.KubernetesBenchmark
import theodolite.benchmark.KubernetesBenchmarkList import theodolite.benchmark.KubernetesBenchmarkList
private var DEFAULT_NAMESPACE = "default" private const val DEFAULT_NAMESPACE = "default"
private const val SCOPE = "Namespaced"
private const val EXECUTION_SINGULAR = "execution"
private const val EXECUTION_PLURAL = "executions"
private const val BENCHMARK_SINGULAR = "benchmark"
private const val BENCHMARK_PLURAL = "benchmarks"
private const val API_VERSION = "v1alpha1"
private const val RESYNC_PERIOD = 10 * 60 * 1000.toLong()
private const val GROUP = "theodolite.com"
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@QuarkusMain(name = "TheodoliteOperator") class TheodoliteOperator {
object TheodoliteOperator { private val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE
@JvmStatic
fun main(args: Array<String>) {
val namespace = System.getenv("NAMESPACE") ?: DEFAULT_NAMESPACE fun start() {
logger.info { "Using $namespace as namespace." } logger.info { "Using $namespace as namespace." }
val client = DefaultKubernetesClient().inNamespace(namespace) val client = DefaultKubernetesClient().inNamespace(namespace)
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"theodolite.com/v1alpha1", "$GROUP/$API_VERSION",
"execution", EXECUTION_SINGULAR,
BenchmarkExecution::class.java BenchmarkExecution::class.java
) )
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"theodolite.com/v1alpha1", "$GROUP/$API_VERSION",
"benchmark", BENCHMARK_SINGULAR,
KubernetesBenchmark::class.java KubernetesBenchmark::class.java
) )
val executionContext = CustomResourceDefinitionContext.Builder() val contextFactory = K8sContextFactory()
.withVersion("v1alpha1") val executionContext = contextFactory.create(API_VERSION, SCOPE, GROUP, EXECUTION_PLURAL)
.withScope("Namespaced") val benchmarkContext = contextFactory.create(API_VERSION, SCOPE, GROUP, BENCHMARK_PLURAL)
.withGroup("theodolite.com")
.withPlural("executions")
.build()
val typeContext = CustomResourceDefinitionContext.Builder() val controller = TheodoliteController(client = client, executionContext = executionContext)
.withVersion("v1alpha1")
.withScope("Namespaced")
.withGroup("theodolite.com")
.withPlural("benchmarks")
.build()
val informerFactory = client.informers() val informerFactory = client.informers()
val informerExecution = informerFactory.sharedIndexInformerForCustomResource(
val informerBenchmarkExecution = informerFactory.sharedIndexInformerForCustomResource(
executionContext, BenchmarkExecution::class.java, executionContext, BenchmarkExecution::class.java,
BenchmarkExecutionList::class.java, 10 * 60 * 1000.toLong() BenchmarkExecutionList::class.java, RESYNC_PERIOD
) )
val informerBenchmark = informerFactory.sharedIndexInformerForCustomResource(
val informerBenchmarkType = informerFactory.sharedIndexInformerForCustomResource( benchmarkContext, KubernetesBenchmark::class.java,
typeContext, KubernetesBenchmark::class.java, KubernetesBenchmarkList::class.java, RESYNC_PERIOD
KubernetesBenchmarkList::class.java, 10 * 60 * 1000.toLong()
)
val controller = TheodoliteController(
client = client,
informerBenchmarkExecution = informerBenchmarkExecution,
informerBenchmarkType = informerBenchmarkType,
executionContext = executionContext
) )
controller.create() informerExecution.addEventHandler(ExecutionHandler(controller))
informerBenchmark.addEventHandler(BenchmarkEventHandler(controller))
informerFactory.startAllRegisteredInformers() informerFactory.startAllRegisteredInformers()
controller.run() controller.run()
} }
} }
...@@ -12,52 +12,35 @@ private val logger = KotlinLogging.logger {} ...@@ -12,52 +12,35 @@ private val logger = KotlinLogging.logger {}
* Manages the topics related tasks * Manages the topics related tasks
* @param kafkaConfig Kafka Configuration as HashMap * @param kafkaConfig Kafka Configuration as HashMap
*/ */
class TopicManager(kafkaConfig: HashMap<String, Any>) { class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
private lateinit var kafkaAdmin: AdminClient
init {
try {
kafkaAdmin = AdminClient.create(kafkaConfig)
} catch (e: Exception) {
logger.error { e.toString() }
}
}
/** /**
* Creates topics. * Creates topics.
* @param newTopics List of all Topic which should be created * @param newTopics List of all Topic which should be created
*/ */
fun createTopics(newTopics: Collection<NewTopic>) { fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
kafkaAdmin.createTopics(newTopics) kafkaAdmin.createTopics(newTopics)
kafkaAdmin.close()
logger.info { "Topics created" } logger.info { "Topics created" }
} }
fun createTopics(topics: List<String>, numPartitions: Int, replicationFactor: Short) {
val newTopics = mutableSetOf<NewTopic>()
for (i in topics) {
val tops = NewTopic(i, numPartitions, replicationFactor)
newTopics.add(tops)
}
kafkaAdmin.createTopics(newTopics)
logger.info { "Creation of $topics started" }
}
/** /**
* Removes topics. * Removes topics.
* @param topics * @param topics
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.deleteTopics(topics) val result = kafkaAdmin.deleteTopics(topics)
try { try {
result.all().get() result.all().get()
} catch (ex: Exception) { } catch (e: Exception) {
logger.error { ex.toString() } logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
} }
kafkaAdmin.close()
logger.info { "Topics removed" } logger.info { "Topics removed" }
} }
fun getTopics(): ListTopicsResult? {
return kafkaAdmin.listTopics()
}
} }
quarkus.package.main-class=TheodoliteOperator
quarkus.native.additional-build-args=\ quarkus.native.additional-build-args=\
--initialize-at-run-time=io.fabric8.kubernetes.client.internal.CertUtils,\ --initialize-at-run-time=io.fabric8.kubernetes.client.internal.CertUtils,\
--report-unsupported-elements-at-runtime --report-unsupported-elements-at-runtime
\ No newline at end of file
...@@ -22,7 +22,7 @@ loadTypes: ...@@ -22,7 +22,7 @@ loadTypes:
container: "workload-generator" container: "workload-generator"
variableName: "NUM_SENSORS" variableName: "NUM_SENSORS"
kafkaConfig: kafkaConfig:
bootstrapServer: "my-confluent-cp-kafka:9092" bootstrapServer: "localhost:31290"
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment