Skip to content
Snippets Groups Projects
Commit 66ff2b6b authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into firestore-test

parents b5cce95e 856b7521
Branches
No related tags found
No related merge requests found
Pipeline #7312 passed
...@@ -74,13 +74,13 @@ public class BeamService { ...@@ -74,13 +74,13 @@ public class BeamService {
if (this.pipelineResult == null) { if (this.pipelineResult == null) {
throw new IllegalStateException("Cannot stop service since it has never been started."); throw new IllegalStateException("Cannot stop service since it has never been started.");
} }
LOGGER.info("Stoping Beam pipeline."); LOGGER.info("Stopping Beam pipeline.");
try { try {
this.pipelineResult.cancel(); this.pipelineResult.cancel();
this.pipelineResult = null; // NOPMD use null to indicate absence this.pipelineResult = null; // NOPMD use null to indicate absence
} catch (final IOException e) { } catch (final IOException e) {
throw new IllegalStateException( throw new IllegalStateException(
"Stoping the service failed due to failed stop of Beam pipeline.", e); "Stopping the service failed due to failed stop of Beam pipeline.", e);
} }
LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName);
} }
......
...@@ -46,7 +46,7 @@ spec: ...@@ -46,7 +46,7 @@ spec:
limits: limits:
memory: 4Gi memory: 4Gi
cpu: 1000m cpu: 1000m
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flinks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#command: ['sleep', '60m'] #command: ['sleep', '60m']
ports: ports:
- containerPort: 6123 - containerPort: 6123
......
...@@ -46,7 +46,7 @@ spec: ...@@ -46,7 +46,7 @@ spec:
limits: limits:
memory: 4Gi memory: 4Gi
cpu: 1000m cpu: 1000m
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flinks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#command: ['sleep', '60m'] #command: ['sleep', '60m']
ports: ports:
- containerPort: 6123 - containerPort: 6123
......
...@@ -4,6 +4,7 @@ import theodolite.benchmark.BenchmarkExecution ...@@ -4,6 +4,7 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.util.InvalidPatcherConfigurationException import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped import javax.enterprise.context.ApplicationScoped
private const val DEFAULT_CONSUMER_LAG_METRIC_BASE = "kafka_consumergroup_lag"
private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)"
private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
...@@ -13,9 +14,14 @@ class SloConfigHandler { ...@@ -13,9 +14,14 @@ class SloConfigHandler {
fun getQueryString(slo: BenchmarkExecution.Slo): String { fun getQueryString(slo: BenchmarkExecution.Slo): String {
return when (slo.sloType.lowercase()) { return when (slo.sloType.lowercase()) {
SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected")
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?:
(slo.properties["consumerGroup"]?.let { "{consumergroup='$it'}" } ?: "").let {
"sum by(consumergroup) ($DEFAULT_CONSUMER_LAG_METRIC_BASE$it >= 0)"
}
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type ${slo.sloType}")
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment