diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt index b2cd269e0a6157ea23cb319cb3cfb6cb87a9d4e9..089f40dc6b5ef7d8ac4b063cae68e5e9621d1f50 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt @@ -4,6 +4,7 @@ import theodolite.benchmark.BenchmarkExecution import theodolite.util.InvalidPatcherConfigurationException import javax.enterprise.context.ApplicationScoped +private const val DEFAULT_CONSUMER_LAG_METRIC_BASE = "kafka_consumergroup_lag" private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" @@ -13,9 +14,14 @@ class SloConfigHandler { fun getQueryString(slo: BenchmarkExecution.Slo): String { return when (slo.sloType.lowercase()) { SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") + SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: + (slo.properties["consumerGroup"]?.let { "{consumergroup='$it'}" } ?: "").let { + "sum by(consumergroup) ($DEFAULT_CONSUMER_LAG_METRIC_BASE$it >= 0)" + } + SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY - else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") + else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type ${slo.sloType}") } } }