Skip to content
Snippets Groups Projects
Commit 41b4c8ff authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix TopicManager.kt

Add additional try catch to BenchmarkExecutorImpl.kt
parent 0fc5b0dc
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!127Fix TopicManager,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment( ...@@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment(
*/ */
override fun teardown() { override fun teardown() {
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LABEL)
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
resources.forEach { resources.forEach {
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
} }
} }
...@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl( ...@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl(
try { try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
} catch(e: Exception) { } catch (e: Exception) {
logger.error { "Error while setup experiment." } logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" } logger.error { "Error is: $e" }
this.run.set(false) this.run.set(false)
...@@ -40,10 +40,20 @@ class BenchmarkExecutorImpl( ...@@ -40,10 +40,20 @@ class BenchmarkExecutorImpl(
*/ */
if (this.run.get()) { if (this.run.get()) {
result = result =
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration) AnalysisExecutor(slo = slo, executionId = executionId).analyze(
load = load,
res = res,
executionDuration = executionDuration
)
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
} }
benchmarkDeployment.teardown()
try {
benchmarkDeployment.teardown()
} catch (e: Exception) {
logger.warn { "Error while teardown of deplyoment" }
logger.debug { "The Teardowm failed cause of: $e " }
}
return result return result
} }
......
...@@ -2,9 +2,12 @@ package theodolite.k8s ...@@ -2,9 +2,12 @@ package theodolite.k8s
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private const val RETRY_TIME = 2000L
/** /**
* Manages the topics related tasks * Manages the topics related tasks
...@@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {} ...@@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {}
* @constructor Creates a KafkaAdminClient * @constructor Creates a KafkaAdminClient
*/ */
class TopicManager(private val kafkaConfig: HashMap<String, Any>) { class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
/** /**
* Creates topics. * Creates topics.
* @param newTopics List of all Topic that should be created * @param newTopics List of all Topic that should be created
*/ */
fun createTopics(newTopics: Collection<NewTopic>) { fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.createTopics(newTopics) lateinit var result: CreateTopicsResult
result.all().get()// wait for the future object
do {
var retryCreation = false
try {
result = kafkaAdmin.createTopics(newTopics)
result.all().get()// wait for the future object
} catch (e: Exception) {
logger.warn { "Error during topic creation." }
logger.warn { "Will retry the topic creation after 2 seconds" }
sleep(RETRY_TIME)
retryCreation = true
}
} while (retryCreation)
logger.info { logger.info {
"Topics created finished with result: ${ "Topics creation finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone } result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
} " } "
...@@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
* @param topics List of names with the topics to remove. * @param topics List of names with the topics to remove.
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
try { var deleted = false
val result = kafkaAdmin.deleteTopics(topics)
result.all().get() // wait for the future object while (!deleted) {
logger.info { try {
"Topics deletion finished with result: ${ val result = kafkaAdmin.deleteTopics(topics)
result.values().map { it -> it.key + ": " + it.value.isDone } result.all().get() // wait for the future object
.joinToString(separator = ",") logger.info {
} " "Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
}"
}
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
}
val toDelete = topics.filter { topic ->
kafkaAdmin.listTopics().names().get().contains(topic)
}
if (toDelete.isNullOrEmpty()) {
deleted = true
} else {
logger.info { "Deletion of kafka topics failed retrying in 2 seconds" }
sleep(RETRY_TIME)
} }
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
} }
kafkaAdmin.close() kafkaAdmin.close()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment