Skip to content
Snippets Groups Projects
Commit 16ad9b58 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'hotfix-topic-manager' into 'theodolite-kotlin'

Fix TopicManager

See merge request !127
parents 86f311b0 b63d2daf
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!127Fix TopicManager,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #2790 passed
...@@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment( ...@@ -47,9 +47,9 @@ class KubernetesBenchmarkDeployment(
*/ */
override fun teardown() { override fun teardown() {
KafkaLagExporterRemover(client).remove(LABEL) KafkaLagExporterRemover(client).remove(LABEL)
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
resources.forEach { resources.forEach {
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
kafkaController.removeTopics(this.topics.map { topic -> topic.name() })
} }
} }
...@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl( ...@@ -29,7 +29,7 @@ class BenchmarkExecutorImpl(
try { try {
benchmarkDeployment.setup() benchmarkDeployment.setup()
this.waitAndLog() this.waitAndLog()
} catch(e: Exception) { } catch (e: Exception) {
logger.error { "Error while setup experiment." } logger.error { "Error while setup experiment." }
logger.error { "Error is: $e" } logger.error { "Error is: $e" }
this.run.set(false) this.run.set(false)
...@@ -40,10 +40,20 @@ class BenchmarkExecutorImpl( ...@@ -40,10 +40,20 @@ class BenchmarkExecutorImpl(
*/ */
if (this.run.get()) { if (this.run.get()) {
result = result =
AnalysisExecutor(slo = slo, executionId = executionId).analyze(load = load, res = res, executionDuration = executionDuration) AnalysisExecutor(slo = slo, executionId = executionId).analyze(
load = load,
res = res,
executionDuration = executionDuration
)
this.results.setResult(Pair(load, res), result) this.results.setResult(Pair(load, res), result)
} }
benchmarkDeployment.teardown()
try {
benchmarkDeployment.teardown()
} catch (e: Exception) {
logger.warn { "Error while teardown of deplyoment" }
logger.debug { "The Teardowm failed cause of: $e " }
}
return result return result
} }
......
...@@ -2,9 +2,12 @@ package theodolite.k8s ...@@ -2,9 +2,12 @@ package theodolite.k8s
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private const val RETRY_TIME = 2000L
/** /**
* Manages the topics related tasks * Manages the topics related tasks
...@@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {} ...@@ -12,17 +15,30 @@ private val logger = KotlinLogging.logger {}
* @constructor Creates a KafkaAdminClient * @constructor Creates a KafkaAdminClient
*/ */
class TopicManager(private val kafkaConfig: HashMap<String, Any>) { class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
/** /**
* Creates topics. * Creates topics.
* @param newTopics List of all Topic that should be created * @param newTopics List of all Topic that should be created
*/ */
fun createTopics(newTopics: Collection<NewTopic>) { fun createTopics(newTopics: Collection<NewTopic>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
val result = kafkaAdmin.createTopics(newTopics) lateinit var result: CreateTopicsResult
result.all().get()// wait for the future object
do {
var retryCreation = false
try {
result = kafkaAdmin.createTopics(newTopics)
result.all().get()// wait for the future object
} catch (e: Exception) {
logger.warn { "Error during topic creation." }
logger.warn { "Will retry the topic creation after 2 seconds" }
sleep(RETRY_TIME)
retryCreation = true
}
} while (retryCreation)
logger.info { logger.info {
"Topics created finished with result: ${ "Topics creation finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone } result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
} " } "
...@@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) { ...@@ -35,19 +51,34 @@ class TopicManager(private val kafkaConfig: HashMap<String, Any>) {
* @param topics List of names with the topics to remove. * @param topics List of names with the topics to remove.
*/ */
fun removeTopics(topics: List<String>) { fun removeTopics(topics: List<String>) {
var kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig) val kafkaAdmin: AdminClient = AdminClient.create(this.kafkaConfig)
try { var deleted = false
val result = kafkaAdmin.deleteTopics(topics)
result.all().get() // wait for the future object while (!deleted) {
logger.info { try {
"Topics deletion finished with result: ${ val result = kafkaAdmin.deleteTopics(topics)
result.values().map { it -> it.key + ": " + it.value.isDone } result.all().get() // wait for the future object
.joinToString(separator = ",") logger.info {
} " "Topics deletion finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",")
}"
}
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
}
val toDelete = topics.filter { topic ->
kafkaAdmin.listTopics().names().get().contains(topic)
}
if (toDelete.isNullOrEmpty()) {
deleted = true
} else {
logger.info { "Deletion of kafka topics failed retrying in 2 seconds" }
sleep(RETRY_TIME)
} }
} catch (e: Exception) {
logger.error { "Error while removing topics: $e" }
logger.debug { "Existing topics are: ${kafkaAdmin.listTopics()}." }
} }
kafkaAdmin.close() kafkaAdmin.close()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment