Skip to content
Snippets Groups Projects
Commit 3ac0ae79 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch '211-add-presentation-features' into 'theodolite-kotlin'

Introduce hot fixes from the presentation branch

See merge request !121
parents 84204cf0 d172691c
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!121Introduce hot fixes from the presentation branch,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2742 canceled
......@@ -3,6 +3,7 @@ package theodolite.execution.operator
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import mu.KotlinLogging
import theodolite.benchmark.BenchmarkExecution
import java.lang.NullPointerException
private val logger = KotlinLogging.logger {}
......@@ -16,7 +17,11 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
override fun onUpdate(oldExecution: BenchmarkExecution, newExecution: BenchmarkExecution) {
logger.info { "Add updated execution to queue." }
newExecution.name = newExecution.metadata.name
try {
this.controller.executionsQueue.removeIf { e -> e.name == newExecution.metadata.name }
} catch(e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
this.controller.executionsQueue.addFirst(newExecution)
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == newExecution.metadata.name) {
this.controller.isUpdated.set(true)
......@@ -25,8 +30,12 @@ class ExecutionHandler(private val controller: TheodoliteController): ResourceEv
}
override fun onDelete(execution: BenchmarkExecution, b: Boolean) {
logger.info { "Delete execution ${execution.metadata.name} from queue." }
try {
this.controller.executionsQueue.removeIf { e -> e.name == execution.metadata.name }
logger.info { "Delete execution ${execution.metadata.name} from queue." }
} catch(e: NullPointerException) {
logger.warn { "No execution found for deletion" }
}
if (this.controller.isInitialized() && this.controller.executor.getExecution().name == execution.metadata.name) {
this.controller.isUpdated.set(true)
this.controller.executor.executor.run.compareAndSet(true, false)
......
......@@ -70,9 +70,15 @@ class TheodoliteController(
executor = TheodoliteExecutor(config = execution, kubernetesBenchmark = benchmark)
executor.run()
try {
if (!isUpdated.get()) {
this.executionsQueue.removeIf { e -> e.name == execution.name }
client.customResource(executionContext).delete(client.namespace, execution.metadata.name)
}
} catch (e: Exception) {
logger.warn { "Deletion skipped." }
}
logger.info { "Execution of ${execution.name} is finally stopped." }
}
......
......@@ -2,7 +2,6 @@ package theodolite.k8s
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.clients.admin.NewTopic
import java.util.*
......@@ -20,9 +19,15 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/
fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
kafkaAdmin.createTopics(newTopics)
val result = kafkaAdmin.createTopics(newTopics)
result.all().get()// wait for the future object
logger.info {
"Topics created finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
kafkaAdmin.close()
logger.info { "Topics created" }
}
......@@ -32,15 +37,19 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
*/
fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.deleteTopics(topics)
try {
result.all().get()
val result = kafkaAdmin.deleteTopics(topics)
result.all().get() // wait for the future object
logger.info {
"\"Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
} "
}
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
}
kafkaAdmin.close()
logger.info { "Topics removed" }
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment