diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d14e478aed45186eb71aa78ca65d8b54ade5856..fc590f029433f64a0f655dc28c3898c7847e4ab5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -131,7 +131,7 @@ lint-helm: script: helm lint helm/ rules: - changes: - - helm/* + - helm/**/* - when: manual allow_failure: true diff --git a/helm/templates/kafka/kafka-cluster.yaml b/helm/templates/kafka/kafka-cluster.yaml index 29cf038f12aa6ee38b21697b8d79b5aea378c7d8..1ff89396513a134e553bc4b97f771822f52ac2ed 100644 --- a/helm/templates/kafka/kafka-cluster.yaml +++ b/helm/templates/kafka/kafka-cluster.yaml @@ -30,6 +30,15 @@ spec: configMapKeyRef: name: {{ template "theodolite.fullname" . }}-kafka-metrics key: kafka-metrics-config.yml + {{- with .Values.strimzi.kafka.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} zookeeper: {{- with .Values.strimzi.zookeeper.replicas }} @@ -37,7 +46,16 @@ spec: {{- toYaml . | nindent 6 }} {{- end }} storage: - type: ephemeral + type: ephemeral + {{- with .Values.strimzi.zookeeper.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} kafkaExporter: {} diff --git a/helm/templates/theodolite/role.yaml b/helm/templates/theodolite/role.yaml index 8b3961a33bd90f81af29b5adde9da449c6a462d8..b8d4d2d005d5a969c2c72cdca145f829d748e419 100644 --- a/helm/templates/theodolite/role.yaml +++ b/helm/templates/theodolite/role.yaml @@ -55,6 +55,9 @@ rules: - get - create - update + {{- with .Values.rbac.additionalRules }} +{{ toYaml . | indent 2 }} + {{- end }} {{- if .Values.operator.enabled }} - apiGroups: - theodolite.com diff --git a/helm/values.yaml b/helm/values.yaml index 188332ef148e3e0e5a8b995fde3c8921581f718b..765f8e4e6bd0a0f9d59dc812d4b7a01d134e10b0 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -173,6 +173,8 @@ strimzi: jvmOptions: "-Xmx": "512M" "-Xms": "512M" + nodeSelectorTerms: [] + zookeeper: replicas: 3 zooEntrance: @@ -180,6 +182,8 @@ strimzi: zookeeperClient: enabled: true nodeSelector: {} + nodeSelectorTerms: [] + topicOperator: enabled: true @@ -341,6 +345,7 @@ serviceAccount: rbac: create: true + additionalRules: [] randomScheduler: enabled: true diff --git a/slo-checker/record-lag/app/main.py b/slo-checker/record-lag/app/main.py index 2e38354d45df57087a94e57d5c9ca412ed5534d3..bb68580a638a40bc7ae975594b859d10784adc67 100644 --- a/slo-checker/record-lag/app/main.py +++ b/slo-checker/record-lag/app/main.py @@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG': def calculate_slope_trend(results, warmup): d = [] for result in results: - group = result['metric']['consumergroup'] + group = result['metric'].get('consumergroup', "default") for value in result['values']: d.append({'group': group, 'timestamp': int( value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java index a4a8f69d74f32697d8e43d58bc5765631fea63de..607f591ffea766041c0472f9995e971f075c31b6 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java @@ -1,7 +1,9 @@ package rocks.theodolite.benchmarks.commons.beam; +import java.io.IOException; import java.util.function.Function; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -23,6 +25,7 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + private PipelineResult pipelineResult; /** * Create a new {@link BeamService}. @@ -43,14 +46,43 @@ public class BeamService { } /** - * Start this microservice, by running the underlying Beam pipeline. + * Start this microservice by running the underlying Beam pipeline. */ public void run() { LOGGER.info("Constructing Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); LOGGER.info("Starting BeamService {}.", this.applicationName); - pipeline.run().waitUntilFinish(); + this.pipelineResult = pipeline.run(); + } + + /** + * Start this microservice by running the underlying Beam pipeline and block until this process is + * terminated. + */ + public void runStandalone() { + this.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop())); + this.pipelineResult.waitUntilFinish(); + } + + /** + * Stop this microservice by canceling the underlying Beam pipeline. + */ + public void stop() { + LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName); + if (this.pipelineResult == null) { + throw new IllegalStateException("Cannot stop service since it has never been started."); + } + LOGGER.info("Stoping Beam pipeline."); + try { + this.pipelineResult.cancel(); + this.pipelineResult = null; // NOPMD use null to indicate absence + } catch (final IOException e) { + throw new IllegalStateException( + "Stoping the service failed due to failed stop of Beam pipeline.", e); + } + LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); } } diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle new file mode 100644 index 0000000000000000000000000000000000000000..3499ba449dbb699038427b622003d5bcf145e034 --- /dev/null +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle @@ -0,0 +1,7 @@ +plugins { + id 'theodolite.beam' +} + +dependencies { + implementation 'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.35.0' +} \ No newline at end of file diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java index 5801b850a70cafbc0a97f9da4f57099203cfd695..ae9a6d4220ceaec091a0a2fb49fb82f16fdbb42e 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java @@ -142,7 +142,7 @@ class EnvVarLoadGeneratorFactory { LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); } else { - throw new IllegalStateException("Neither an emulator host nor a project was provided."); + throw new IllegalStateException("Neither an emulator host nor a project was provided."); } } else { // Should never happen diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 776e7d8e4fe132839b6e27c70c368720415721ea..42651f5970d1744ccb94866f19e8742a60ce8e31 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -10,6 +10,7 @@ include 'uc1-commons' include 'uc1-kstreams' include 'uc1-flink' include 'uc1-beam' +include 'uc1-beam-dataflow' include 'uc1-beam-flink' include 'uc1-beam-samza' @@ -24,6 +25,7 @@ include 'uc3-load-generator' include 'uc3-kstreams' include 'uc3-flink' include 'uc3-beam' +include 'uc3-beam-dataflow' include 'uc3-beam-flink' include 'uc3-beam-samza' @@ -35,3 +37,4 @@ include 'uc4-beam-flink' include 'uc4-beam-samza' include 'http-bridge' + diff --git a/theodolite-benchmarks/uc1-beam-dataflow/.gitignore b/theodolite-benchmarks/uc1-beam-dataflow/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7bf05dd280fcc888467656ce1fbdeb65322c7ba8 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/.gitignore @@ -0,0 +1 @@ +state \ No newline at end of file diff --git a/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile b/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..cf6ef6675464e3c9d37db492b39fd8a71ec60e63 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile @@ -0,0 +1,9 @@ +FROM openjdk:11-slim + +ENV MAX_SOURCE_PARALLELISM=1024 + +ADD build/distributions/uc1-beam-samza.tar / +ADD samza-standalone.properties / + +CMD /uc1-beam-samza/bin/uc1-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}" + diff --git a/theodolite-benchmarks/uc1-beam-dataflow/build.gradle b/theodolite-benchmarks/uc1-beam-dataflow/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..21ffd94500cf2945f04fbe59c511ec370b891a37 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'theodolite.beam.dataflow' +} + +dependencies { + implementation project(':uc1-beam') +} + +sourceSets { + main { + resources { + srcDirs += [ + project(':uc1-beam').sourceSets.main.resources + ] + } + } +} + +mainClassName = "rocks.theodolite.benchmarks.uc1.beam.dataflow.Uc1BeamDataflow" diff --git a/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java b/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java new file mode 100644 index 0000000000000000000000000000000000000000..24af2f24d175509a43b669f5cddf5eaad0bd81ba --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc1.beam.dataflow; + +import org.apache.beam.runners.dataflow.DataflowRunner; +import rocks.theodolite.benchmarks.commons.beam.BeamService; +import rocks.theodolite.benchmarks.uc1.beam.PipelineFactory; + +/** + * Implementation of the use case Database Storage using Apache Beam with the Google Cloud Dataflow + * runner. + */ +public final class Uc1BeamDataflow { + + private Uc1BeamDataflow() {} + + /** + * Main method. + */ + public static void main(final String[] args) { + new BeamService(PipelineFactory.factory(), DataflowRunner.class, args).runStandalone(); + } +} diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java index e1317219fedf24bc4b0eb4a3f9668da7de196cca..7f39500433a77612fe5ab010372a24ca46035135 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java @@ -17,7 +17,7 @@ public final class Uc1BeamFlink { private Uc1BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java index d3455db71bc3520bfa11c4da3a58c32da46337f9..9c3f650b7ddbe5e3c08139cdec2e590f5d55f3b3 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java @@ -21,6 +21,6 @@ public final class Uc1BeamSamza { * Main method. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java index 4519515cf7d74abb0c447c56df4bbe313133c6a7..c1dc2f7305d01b47de644e4f8d391955540f530c 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java @@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; /** * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. - * + * * @param <T> type the {@link DatabaseWriter} is associated with. */ public class WriterAdapter<T> extends DoFn<T, Void> { diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java index ab6a9992a5dfca11a182235b467d5be76488ed55..2772d76fa26f504827ab74acb8fccc45f117365c 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc2BeamFlink { private Uc2BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java index 80981818d401b48ed61ee56987764684df9dd31f..1b3f4ac8a2d052f0d34051e6b17b62100feb129d 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java @@ -19,7 +19,7 @@ public final class Uc2BeamSamza { private Uc2BeamSamza() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-dataflow/.gitignore b/theodolite-benchmarks/uc3-beam-dataflow/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7bf05dd280fcc888467656ce1fbdeb65322c7ba8 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/.gitignore @@ -0,0 +1 @@ +state \ No newline at end of file diff --git a/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile b/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..54979b8e1fa8aa9ac7d073302301bd10cbff5f34 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile @@ -0,0 +1,8 @@ +FROM openjdk:11-slim + +ENV MAX_SOURCE_PARALLELISM=1024 + +ADD build/distributions/uc3-beam-samza.tar / +ADD samza-standalone.properties / + +CMD /uc3-beam-samza/bin/uc3-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}" diff --git a/theodolite-benchmarks/uc3-beam-dataflow/build.gradle b/theodolite-benchmarks/uc3-beam-dataflow/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..dabf19000f884c44657eadd15670c2eda1a4802e --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'theodolite.beam.dataflow' +} + +dependencies { + implementation project(':uc3-beam') +} + +sourceSets { + main { + resources { + srcDirs += [ + project(':uc3-beam').sourceSets.main.resources + ] + } + } +} + +mainClassName = "rocks.theodolite.benchmarks.uc3.beam.dataflow.Uc3BeamDataflow" diff --git a/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java b/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java new file mode 100644 index 0000000000000000000000000000000000000000..5c8bc8c423a9f6fd64bc7ca663588dab19e10e74 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java @@ -0,0 +1,23 @@ +package rocks.theodolite.benchmarks.uc3.beam.dataflow; + +import org.apache.beam.runners.dataflow.DataflowRunner; +import rocks.theodolite.benchmarks.commons.beam.BeamService; +import rocks.theodolite.benchmarks.uc3.beam.PipelineFactory; + +/** + * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the + * Google Cloud Dataflow runner. + */ +public final class Uc3BeamDataflow { + + private Uc3BeamDataflow() {} + + /** + * Start running this microservice. + */ + public static void main(final String[] args) { + new BeamService(PipelineFactory.factory(), DataflowRunner.class, args).runStandalone(); + } + +} + diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java index 8782559fea6a08ad2c5a92b355149e3a2ee02ea2..f4f4563925ede4d61edcaab29c3d6e7aed0b5e9c 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java @@ -21,7 +21,7 @@ public final class Uc3BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java index 84e705f6f52f41f5c553a1ef3fb2ebd7ce95e20a..247cd99becff8a200185c8fa40efb49bf31a6806 100644 --- a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java +++ b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java @@ -21,7 +21,7 @@ public final class Uc3BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java index 5d398d610a12890e3fb9c85804a4b59a69163b4f..f5f9af3fc14b57476975708a139788e7f0386953 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc4BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java index 044b3dc4b647dffa02a62d17c9fcdaf15b0a0869..585e3ff9589c0262c12b6fa33023cd69b58c53f1 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java @@ -22,7 +22,7 @@ public final class Uc4BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt b/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt index 7fb5417e200f64b0db74a8bebe69a751c5d484b8..7587e8326df98f3c45c016bfd3b2d7db8077e6d1 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/ExternalSloChecker.kt @@ -40,7 +40,7 @@ class ExternalSloChecker( val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) if (result.statusCode != 200) { counter++ - logger.error { "Could not reach external SLO checker." } + logger.error { "Could not reach external SLO checker at $externalSlopeURL." } } else { val booleanResult = result.text.toBoolean() logger.info { "SLO checker result is: $booleanResult." } @@ -48,6 +48,6 @@ class ExternalSloChecker( } } - throw ConnectException("Could not reach external SLO checker") + throw ConnectException("Could not reach external SLO checker at $externalSlopeURL.") } } diff --git a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt index 924305660798e6dbed06662ef4e393c63f5f2bfa..b2cd269e0a6157ea23cb319cb3cfb6cb87a9d4e9 100644 --- a/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt +++ b/theodolite/src/main/kotlin/theodolite/evaluation/SloConfigHandler.kt @@ -4,17 +4,17 @@ import theodolite.benchmark.BenchmarkExecution import theodolite.util.InvalidPatcherConfigurationException import javax.enterprise.context.ApplicationScoped -private const val CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" -private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" +private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" +private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" @ApplicationScoped class SloConfigHandler { companion object { fun getQueryString(slo: BenchmarkExecution.Slo): String { - return when (slo.sloType.toLowerCase()) { + return when (slo.sloType.lowercase()) { SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") - SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY - SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY + SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY + SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") } }