Skip to content
Snippets Groups Projects
Commit 966a96b3 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' into benchmark-definitions

parents e566e97c a5691ef9
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!131Add definitions for UC1, UC2, UC3, UC4,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
...@@ -30,7 +30,7 @@ class KubernetesBenchmarkDeployment( ...@@ -30,7 +30,7 @@ class KubernetesBenchmarkDeployment(
private val kafkaController = TopicManager(this.kafkaConfig) private val kafkaController = TopicManager(this.kafkaConfig)
private val kubernetesManager = K8sManager(client) private val kubernetesManager = K8sManager(client)
private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter" private val LAG_EXPORTER_POD_LABEL = "app.kubernetes.io/name=kafka-lag-exporter"
private val SLEEP_AFTER_K8S_DELETION_MS = 2000L private val SLEEP_AFTER_TEARDOWN = 5000L
/** /**
* Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]: * Setup a [KubernetesBenchmark] using the [TopicManager] and the [K8sManager]:
...@@ -41,9 +41,7 @@ class KubernetesBenchmarkDeployment( ...@@ -41,9 +41,7 @@ class KubernetesBenchmarkDeployment(
val kafkaTopics = this.topics.filter { !it.removeOnly } val kafkaTopics = this.topics.filter { !it.removeOnly }
.map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) } .map{ NewTopic(it.name, it.numPartitions, it.replicationFactor) }
kafkaController.createTopics(kafkaTopics) kafkaController.createTopics(kafkaTopics)
resources.forEach { resources.forEach { kubernetesManager.deploy(it) }
kubernetesManager.deploy(it)
}
} }
/** /**
...@@ -56,9 +54,9 @@ class KubernetesBenchmarkDeployment( ...@@ -56,9 +54,9 @@ class KubernetesBenchmarkDeployment(
resources.forEach { resources.forEach {
kubernetesManager.remove(it) kubernetesManager.remove(it)
} }
logger.info { "Kubernetes resources deleted. Allow for short pause before continuing." }
Thread.sleep(SLEEP_AFTER_K8S_DELETION_MS)
kafkaController.removeTopics(this.topics.map { topic -> topic.name }) kafkaController.removeTopics(this.topics.map { topic -> topic.name })
KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL) KafkaLagExporterRemover(client).remove(LAG_EXPORTER_POD_LABEL)
logger.info { "Teardown complete. Wait $SLEEP_AFTER_TEARDOWN ms to let everything come down." }
Thread.sleep(SLEEP_AFTER_TEARDOWN)
} }
} }
...@@ -4,6 +4,7 @@ import mu.KotlinLogging ...@@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsResult import org.apache.kafka.clients.admin.CreateTopicsResult
import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.errors.TopicExistsException
import java.lang.Thread.sleep import java.lang.Thread.sleep
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
...@@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -29,13 +30,12 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
try { try {
result = kafkaAdmin.createTopics(newTopics) result = kafkaAdmin.createTopics(newTopics)
result.all().get() // wait for the future to be completed result.all().get() // wait for the future to be completed
} catch (e: Exception) { // TopicExistsException
} catch (e: Exception) {
logger.warn(e) { "Error during topic creation." } logger.warn(e) { "Error during topic creation." }
logger.debug { e } // TODO remove? logger.debug { e } // TODO remove due to attached exception to warn log?
logger.info { "Remove existing topics." } logger.info { "Remove existing topics." }
delete(newTopics.map { topic -> topic.name() }, kafkaAdmin) delete(newTopics.map { topic -> topic.name() }, kafkaAdmin)
logger.info { "Will retry the topic creation in $RETRY_TIME seconds." } logger.info { "Will retry the topic creation in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME) sleep(RETRY_TIME)
retryCreation = true retryCreation = true
} }
...@@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -43,7 +43,9 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
logger.info { logger.info {
"Topics creation finished with result: ${ "Topics creation finished with result: ${
result.values().map { it -> it.key + ": " + it.value.isDone } result
.values()
.map { it -> it.key + ": " + it.value.isDone }
.joinToString(separator = ",") .joinToString(separator = ",")
} " } "
} }
...@@ -93,17 +95,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) { ...@@ -93,17 +95,15 @@ class TopicManager(private val kafkaConfig: Map<String, Any>) {
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error(e) { "Error while removing topics: $e" } logger.error(e) { "Error while removing topics: $e" }
logger.info { "Existing topics are: ${kafkaAdmin.listTopics()}." } logger.info { "Existing topics are: ${kafkaAdmin.listTopics().names().get()}." }
} }
val toDelete = topics.filter { topic -> val toDelete = topics.filter { kafkaAdmin.listTopics().names().get().contains(it) }
kafkaAdmin.listTopics().names().get().contains(topic)
}
if (toDelete.isNullOrEmpty()) { if (toDelete.isNullOrEmpty()) {
deleted = true deleted = true
} else { } else {
logger.info { "Deletion of kafka topics failed, will retry in $RETRY_TIME seconds." } logger.info { "Deletion of Kafka topics failed, will retry in ${RETRY_TIME/1000} seconds." }
sleep(RETRY_TIME) sleep(RETRY_TIME)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment