diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java index 607f591ffea766041c0472f9995e971f075c31b6..0165fa644e1853353e73caeaf0b9d2df0f8e9aea 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java @@ -74,13 +74,13 @@ public class BeamService { if (this.pipelineResult == null) { throw new IllegalStateException("Cannot stop service since it has never been started."); } - LOGGER.info("Stoping Beam pipeline."); + LOGGER.info("Stopping Beam pipeline."); try { this.pipelineResult.cancel(); this.pipelineResult = null; // NOPMD use null to indicate absence } catch (final IOException e) { throw new IllegalStateException( - "Stoping the service failed due to failed stop of Beam pipeline.", e); + "Stopping the service failed due to failed stop of Beam pipeline.", e); } LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); } diff --git a/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml index 620e9d89fb7aba54de9c3a7874dd804050c36191..f1c56b3a51ec884dca25a31ffafea195919a02e2 100644 --- a/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc3-flink/resources/jobmanager-deployment.yaml @@ -46,7 +46,7 @@ spec: limits: memory: 4Gi cpu: 1000m - args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flinks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] + args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] #command: ['sleep', '60m'] ports: - containerPort: 6123 diff --git a/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml b/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml index d1ebf745537bd233ba6e702b2cc4cd919103e7b7..d7037e2c579d82485bb31c53f132d7938f424b38 100644 --- a/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc4-flink/resources/jobmanager-deployment.yaml @@ -46,7 +46,7 @@ spec: limits: memory: 4Gi cpu: 1000m - args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flinks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] + args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] #command: ['sleep', '60m'] ports: - containerPort: 6123 diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt index b2cd269e0a6157ea23cb319cb3cfb6cb87a9d4e9..089f40dc6b5ef7d8ac4b063cae68e5e9621d1f50 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt @@ -4,6 +4,7 @@ import theodolite.benchmark.BenchmarkExecution import theodolite.util.InvalidPatcherConfigurationException import javax.enterprise.context.ApplicationScoped +private const val DEFAULT_CONSUMER_LAG_METRIC_BASE = "kafka_consumergroup_lag" private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" @@ -13,9 +14,14 @@ class SloConfigHandler { fun getQueryString(slo: BenchmarkExecution.Slo): String { return when (slo.sloType.lowercase()) { SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") + SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: + (slo.properties["consumerGroup"]?.let { "{consumergroup='$it'}" } ?: "").let { + "sum by(consumergroup) ($DEFAULT_CONSUMER_LAG_METRIC_BASE$it >= 0)" + } + SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY - else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") + else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type ${slo.sloType}") } } }