Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (6)
......@@ -45,6 +45,16 @@ rules:
- list
- create
- get
- apiGroups:
- kafka.strimzi.io
resources:
- kafkatopics
verbs:
- delete
- list
- get
- create
- update
{{- if .Values.operator.enabled }}
- apiGroups:
- theodolite.com
......
......@@ -181,7 +181,7 @@ strimzi:
enabled: true
nodeSelector: {}
topicOperator:
enabled: false
enabled: true
###
......
......@@ -142,7 +142,7 @@ class EnvVarLoadGeneratorFactory {
LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic);
} else {
throw new IllegalStateException("Neither an emulator host nor a project was provided.");
throw new IllegalStateException("Neither an emulator host nor a project was provided.");
}
} else {
// Should never happen
......
......@@ -80,7 +80,7 @@ class KubernetesBenchmarkDeployment(
labelName = LAG_EXPORTER_POD_LABEL_NAME,
labelValue = LAG_EXPORTER_POD_LABEL_VALUE
)
logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." }
logger.info { "Teardown complete. Wait $afterTeardownDelay seconds to let everything cool down." }
Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis())
}
}
......@@ -4,17 +4,17 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped
private const val CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)"
private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)"
private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
@ApplicationScoped
class SloConfigHandler {
companion object {
fun getQueryString(slo: BenchmarkExecution.Slo): String {
return when (slo.sloType.toLowerCase()) {
return when (slo.sloType.lowercase()) {
SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected")
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType")
}
}
......
......@@ -18,22 +18,22 @@ class LeaderElector(
) {
// TODO(what is the name of the lock? .withName() or LeaseLock(..,name..) ?)
fun getLeadership(leader: KFunction0<Unit>) {
fun getLeadership(leader: () -> Unit) {
val lockIdentity: String = UUID.randomUUID().toString()
DefaultKubernetesClient().use { kc ->
kc.leaderElector()
.withConfig(
LeaderElectionConfigBuilder()
.withName("Theodolite")
.withLeaseDuration(Duration.ofSeconds(15L))
.withLeaseDuration(Duration.ofSeconds(15))
.withLock(LeaseLock(client.namespace, name, lockIdentity))
.withRenewDeadline(Duration.ofSeconds(10L))
.withRetryPeriod(Duration.ofSeconds(2L))
.withRenewDeadline(Duration.ofSeconds(10))
.withRetryPeriod(Duration.ofSeconds(2))
.withLeaderCallbacks(LeaderCallbacks(
{ Thread { leader() }.start() },
{ logger.info { "STOPPED LEADERSHIP" } }
{ logger.info { "Stop being the leading operator." } }
) { newLeader: String? ->
logger.info { "New leader elected $newLeader" }
logger.info { "New leader elected: $newLeader" }
})
.build()
)
......
......@@ -41,15 +41,14 @@ class TheodoliteOperator {
LeaderElector(
client = client,
name = Configuration.COMPONENT_NAME
)
.getLeadership(::startOperator)
).getLeadership(::startOperator)
}
/**
* Start the operator.
*/
private fun startOperator() {
logger.info { "Using $namespace as namespace." }
logger.info { "Becoming the leading operator. Use namespace '$namespace'." }
client.use {
KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION",
......