Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (6)
...@@ -45,6 +45,16 @@ rules: ...@@ -45,6 +45,16 @@ rules:
- list - list
- create - create
- get - get
- apiGroups:
- kafka.strimzi.io
resources:
- kafkatopics
verbs:
- delete
- list
- get
- create
- update
{{- if .Values.operator.enabled }} {{- if .Values.operator.enabled }}
- apiGroups: - apiGroups:
- theodolite.com - theodolite.com
......
...@@ -181,7 +181,7 @@ strimzi: ...@@ -181,7 +181,7 @@ strimzi:
enabled: true enabled: true
nodeSelector: {} nodeSelector: {}
topicOperator: topicOperator:
enabled: false enabled: true
### ###
......
...@@ -142,7 +142,7 @@ class EnvVarLoadGeneratorFactory { ...@@ -142,7 +142,7 @@ class EnvVarLoadGeneratorFactory {
LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic);
} else { } else {
throw new IllegalStateException("Neither an emulator host nor a project was provided."); throw new IllegalStateException("Neither an emulator host nor a project was provided.");
} }
} else { } else {
// Should never happen // Should never happen
......
...@@ -80,7 +80,7 @@ class KubernetesBenchmarkDeployment( ...@@ -80,7 +80,7 @@ class KubernetesBenchmarkDeployment(
labelName = LAG_EXPORTER_POD_LABEL_NAME, labelName = LAG_EXPORTER_POD_LABEL_NAME,
labelValue = LAG_EXPORTER_POD_LABEL_VALUE labelValue = LAG_EXPORTER_POD_LABEL_VALUE
) )
logger.info { "Teardown complete. Wait $afterTeardownDelay ms to let everything come down." } logger.info { "Teardown complete. Wait $afterTeardownDelay seconds to let everything cool down." }
Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis()) Thread.sleep(Duration.ofSeconds(afterTeardownDelay).toMillis())
} }
} }
...@@ -4,17 +4,17 @@ import theodolite.benchmark.BenchmarkExecution ...@@ -4,17 +4,17 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.util.InvalidPatcherConfigurationException import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped import javax.enterprise.context.ApplicationScoped
private const val CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)"
private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
@ApplicationScoped @ApplicationScoped
class SloConfigHandler { class SloConfigHandler {
companion object { companion object {
fun getQueryString(slo: BenchmarkExecution.Slo): String { fun getQueryString(slo: BenchmarkExecution.Slo): String {
return when (slo.sloType.toLowerCase()) { return when (slo.sloType.lowercase()) {
SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected")
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType")
} }
} }
......
...@@ -18,22 +18,22 @@ class LeaderElector( ...@@ -18,22 +18,22 @@ class LeaderElector(
) { ) {
// TODO(what is the name of the lock? .withName() or LeaseLock(..,name..) ?) // TODO(what is the name of the lock? .withName() or LeaseLock(..,name..) ?)
fun getLeadership(leader: KFunction0<Unit>) { fun getLeadership(leader: () -> Unit) {
val lockIdentity: String = UUID.randomUUID().toString() val lockIdentity: String = UUID.randomUUID().toString()
DefaultKubernetesClient().use { kc -> DefaultKubernetesClient().use { kc ->
kc.leaderElector() kc.leaderElector()
.withConfig( .withConfig(
LeaderElectionConfigBuilder() LeaderElectionConfigBuilder()
.withName("Theodolite") .withName("Theodolite")
.withLeaseDuration(Duration.ofSeconds(15L)) .withLeaseDuration(Duration.ofSeconds(15))
.withLock(LeaseLock(client.namespace, name, lockIdentity)) .withLock(LeaseLock(client.namespace, name, lockIdentity))
.withRenewDeadline(Duration.ofSeconds(10L)) .withRenewDeadline(Duration.ofSeconds(10))
.withRetryPeriod(Duration.ofSeconds(2L)) .withRetryPeriod(Duration.ofSeconds(2))
.withLeaderCallbacks(LeaderCallbacks( .withLeaderCallbacks(LeaderCallbacks(
{ Thread { leader() }.start() }, { Thread { leader() }.start() },
{ logger.info { "STOPPED LEADERSHIP" } } { logger.info { "Stop being the leading operator." } }
) { newLeader: String? -> ) { newLeader: String? ->
logger.info { "New leader elected $newLeader" } logger.info { "New leader elected: $newLeader" }
}) })
.build() .build()
) )
......
...@@ -41,15 +41,14 @@ class TheodoliteOperator { ...@@ -41,15 +41,14 @@ class TheodoliteOperator {
LeaderElector( LeaderElector(
client = client, client = client,
name = Configuration.COMPONENT_NAME name = Configuration.COMPONENT_NAME
) ).getLeadership(::startOperator)
.getLeadership(::startOperator)
} }
/** /**
* Start the operator. * Start the operator.
*/ */
private fun startOperator() { private fun startOperator() {
logger.info { "Using $namespace as namespace." } logger.info { "Becoming the leading operator. Use namespace '$namespace'." }
client.use { client.use {
KubernetesDeserializer.registerCustomKind( KubernetesDeserializer.registerCustomKind(
"$GROUP/$API_VERSION", "$GROUP/$API_VERSION",
......