Skip to content
Snippets Groups Projects
Commit faaeb8d3 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge remote-tracking branch 'upstream/master' into hazelcastjet

parents 408d1e5d 856b7521
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7314 passed
Showing
with 88 additions and 24 deletions
...@@ -131,7 +131,7 @@ lint-helm: ...@@ -131,7 +131,7 @@ lint-helm:
script: helm lint helm/ script: helm lint helm/
rules: rules:
- changes: - changes:
- helm/* - helm/**/*
- when: manual - when: manual
allow_failure: true allow_failure: true
......
...@@ -30,6 +30,15 @@ spec: ...@@ -30,6 +30,15 @@ spec:
configMapKeyRef: configMapKeyRef:
name: {{ template "theodolite.fullname" . }}-kafka-metrics name: {{ template "theodolite.fullname" . }}-kafka-metrics
key: kafka-metrics-config.yml key: kafka-metrics-config.yml
{{- with .Values.strimzi.kafka.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
zookeeper: zookeeper:
{{- with .Values.strimzi.zookeeper.replicas }} {{- with .Values.strimzi.zookeeper.replicas }}
...@@ -37,7 +46,16 @@ spec: ...@@ -37,7 +46,16 @@ spec:
{{- toYaml . | nindent 6 }} {{- toYaml . | nindent 6 }}
{{- end }} {{- end }}
storage: storage:
type: ephemeral type: ephemeral
{{- with .Values.strimzi.zookeeper.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
kafkaExporter: {} kafkaExporter: {}
......
...@@ -55,6 +55,9 @@ rules: ...@@ -55,6 +55,9 @@ rules:
- get - get
- create - create
- update - update
{{- with .Values.rbac.additionalRules }}
{{ toYaml . | indent 2 }}
{{- end }}
{{- if .Values.operator.enabled }} {{- if .Values.operator.enabled }}
- apiGroups: - apiGroups:
- theodolite.com - theodolite.com
......
...@@ -173,6 +173,8 @@ strimzi: ...@@ -173,6 +173,8 @@ strimzi:
jvmOptions: jvmOptions:
"-Xmx": "512M" "-Xmx": "512M"
"-Xms": "512M" "-Xms": "512M"
nodeSelectorTerms: []
zookeeper: zookeeper:
replicas: 3 replicas: 3
zooEntrance: zooEntrance:
...@@ -180,6 +182,8 @@ strimzi: ...@@ -180,6 +182,8 @@ strimzi:
zookeeperClient: zookeeperClient:
enabled: true enabled: true
nodeSelector: {} nodeSelector: {}
nodeSelectorTerms: []
topicOperator: topicOperator:
enabled: true enabled: true
...@@ -341,6 +345,7 @@ serviceAccount: ...@@ -341,6 +345,7 @@ serviceAccount:
rbac: rbac:
create: true create: true
additionalRules: []
randomScheduler: randomScheduler:
enabled: true enabled: true
......
...@@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG': ...@@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG':
def calculate_slope_trend(results, warmup): def calculate_slope_trend(results, warmup):
d = [] d = []
for result in results: for result in results:
group = result['metric']['consumergroup'] group = result['metric'].get('consumergroup', "default")
for value in result['values']: for value in result['values']:
d.append({'group': group, 'timestamp': int( d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
......
package rocks.theodolite.benchmarks.commons.beam; package rocks.theodolite.benchmarks.commons.beam;
import java.io.IOException;
import java.util.function.Function; import java.util.function.Function;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
...@@ -23,6 +25,7 @@ public class BeamService { ...@@ -23,6 +25,7 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory; private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions; private final PipelineOptions pipelineOptions;
private PipelineResult pipelineResult;
/** /**
* Create a new {@link BeamService}. * Create a new {@link BeamService}.
...@@ -43,14 +46,43 @@ public class BeamService { ...@@ -43,14 +46,43 @@ public class BeamService {
} }
/** /**
* Start this microservice, by running the underlying Beam pipeline. * Start this microservice by running the underlying Beam pipeline.
*/ */
public void run() { public void run() {
LOGGER.info("Constructing Beam pipeline with pipeline options: {}", LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString()); this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName); LOGGER.info("Starting BeamService {}.", this.applicationName);
pipeline.run().waitUntilFinish(); this.pipelineResult = pipeline.run();
}
/**
* Start this microservice by running the underlying Beam pipeline and block until this process is
* terminated.
*/
public void runStandalone() {
this.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.pipelineResult.waitUntilFinish();
}
/**
* Stop this microservice by canceling the underlying Beam pipeline.
*/
public void stop() {
LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName);
if (this.pipelineResult == null) {
throw new IllegalStateException("Cannot stop service since it has never been started.");
}
LOGGER.info("Stopping Beam pipeline.");
try {
this.pipelineResult.cancel();
this.pipelineResult = null; // NOPMD use null to indicate absence
} catch (final IOException e) {
throw new IllegalStateException(
"Stopping the service failed due to failed stop of Beam pipeline.", e);
}
LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName);
} }
} }
...@@ -46,7 +46,7 @@ spec: ...@@ -46,7 +46,7 @@ spec:
limits: limits:
memory: 4Gi memory: 4Gi
cpu: 1000m cpu: 1000m
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flinks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.flink.HistoryServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#command: ['sleep', '60m'] #command: ['sleep', '60m']
ports: ports:
- containerPort: 6123 - containerPort: 6123
......
...@@ -46,7 +46,7 @@ spec: ...@@ -46,7 +46,7 @@ spec:
limits: limits:
memory: 4Gi memory: 4Gi
cpu: 1000m cpu: 1000m
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flinks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.flink.AggregationServiceFlinkJob"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#command: ['sleep', '60m'] #command: ['sleep', '60m']
ports: ports:
- containerPort: 6123 - containerPort: 6123
......
...@@ -17,7 +17,7 @@ public final class Uc1BeamFlink { ...@@ -17,7 +17,7 @@ public final class Uc1BeamFlink {
private Uc1BeamFlink() {} private Uc1BeamFlink() {}
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
} }
} }
......
...@@ -21,6 +21,6 @@ public final class Uc1BeamSamza { ...@@ -21,6 +21,6 @@ public final class Uc1BeamSamza {
* Main method. * Main method.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
} }
} }
...@@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; ...@@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/** /**
* {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam.
* *
* @param <T> type the {@link DatabaseWriter} is associated with. * @param <T> type the {@link DatabaseWriter} is associated with.
*/ */
public class WriterAdapter<T> extends DoFn<T, Void> { public class WriterAdapter<T> extends DoFn<T, Void> {
......
...@@ -15,7 +15,7 @@ public final class Uc2BeamFlink { ...@@ -15,7 +15,7 @@ public final class Uc2BeamFlink {
private Uc2BeamFlink() {} private Uc2BeamFlink() {}
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
} }
} }
...@@ -19,7 +19,7 @@ public final class Uc2BeamSamza { ...@@ -19,7 +19,7 @@ public final class Uc2BeamSamza {
private Uc2BeamSamza() {} private Uc2BeamSamza() {}
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
} }
} }
......
...@@ -21,7 +21,7 @@ public final class Uc3BeamFlink { ...@@ -21,7 +21,7 @@ public final class Uc3BeamFlink {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
} }
} }
...@@ -21,7 +21,7 @@ public final class Uc3BeamSamza { ...@@ -21,7 +21,7 @@ public final class Uc3BeamSamza {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
} }
} }
......
...@@ -15,7 +15,7 @@ public final class Uc4BeamFlink { ...@@ -15,7 +15,7 @@ public final class Uc4BeamFlink {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
} }
} }
...@@ -22,7 +22,7 @@ public final class Uc4BeamSamza { ...@@ -22,7 +22,7 @@ public final class Uc4BeamSamza {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
} }
} }
...@@ -40,7 +40,7 @@ class ExternalSloChecker( ...@@ -40,7 +40,7 @@ class ExternalSloChecker(
val result = post(externalSlopeURL, data = data, timeout = TIMEOUT) val result = post(externalSlopeURL, data = data, timeout = TIMEOUT)
if (result.statusCode != 200) { if (result.statusCode != 200) {
counter++ counter++
logger.error { "Could not reach external SLO checker." } logger.error { "Could not reach external SLO checker at $externalSlopeURL." }
} else { } else {
val booleanResult = result.text.toBoolean() val booleanResult = result.text.toBoolean()
logger.info { "SLO checker result is: $booleanResult." } logger.info { "SLO checker result is: $booleanResult." }
...@@ -48,6 +48,6 @@ class ExternalSloChecker( ...@@ -48,6 +48,6 @@ class ExternalSloChecker(
} }
} }
throw ConnectException("Could not reach external SLO checker") throw ConnectException("Could not reach external SLO checker at $externalSlopeURL.")
} }
} }
...@@ -4,18 +4,24 @@ import theodolite.benchmark.BenchmarkExecution ...@@ -4,18 +4,24 @@ import theodolite.benchmark.BenchmarkExecution
import theodolite.util.InvalidPatcherConfigurationException import theodolite.util.InvalidPatcherConfigurationException
import javax.enterprise.context.ApplicationScoped import javax.enterprise.context.ApplicationScoped
private const val CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)" private const val DEFAULT_CONSUMER_LAG_METRIC_BASE = "kafka_consumergroup_lag"
private const val DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)" private const val DEFAULT_CONSUMER_LAG_QUERY = "sum by(consumergroup) (kafka_consumergroup_lag >= 0)"
private const val DEFAULT_DROPPED_RECORDS_QUERY = "sum by(job) (kafka_streams_stream_task_metrics_dropped_records_total>=0)"
@ApplicationScoped @ApplicationScoped
class SloConfigHandler { class SloConfigHandler {
companion object { companion object {
fun getQueryString(slo: BenchmarkExecution.Slo): String { fun getQueryString(slo: BenchmarkExecution.Slo): String {
return when (slo.sloType.toLowerCase()) { return when (slo.sloType.lowercase()) {
SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected") SloTypes.GENERIC.value -> slo.properties["promQLQuery"] ?: throw IllegalArgumentException("promQLQuery expected")
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> CONSUMER_LAG_QUERY SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?:
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> DROPPED_RECORDS_QUERY (slo.properties["consumerGroup"]?.let { "{consumergroup='$it'}" } ?: "").let {
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type $slo.sloType") "sum by(consumergroup) ($DEFAULT_CONSUMER_LAG_METRIC_BASE$it >= 0)"
}
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
SloTypes.LAG_TREND.value, SloTypes.LAG_TREND_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_CONSUMER_LAG_QUERY
SloTypes.DROPPED_RECORDS.value, SloTypes.DROPPED_RECORDS_RATIO.value -> slo.properties["promQLQuery"] ?: DEFAULT_DROPPED_RECORDS_QUERY
else -> throw InvalidPatcherConfigurationException("Could not find Prometheus query string for slo type ${slo.sloType}")
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment