Skip to content
Snippets Groups Projects
Commit 29dbaa24 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'beam-dataflow' into simple-uc3

parents 72c6d6a8 a0026af6
No related branches found
No related tags found
No related merge requests found
Showing
with 138 additions and 11 deletions
......@@ -131,7 +131,7 @@ lint-helm:
script: helm lint helm/
rules:
- changes:
- helm/*
- helm/**/*
- when: manual
allow_failure: true
......
......@@ -30,6 +30,15 @@ spec:
configMapKeyRef:
name: {{ template "theodolite.fullname" . }}-kafka-metrics
key: kafka-metrics-config.yml
{{- with .Values.strimzi.kafka.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
zookeeper:
{{- with .Values.strimzi.zookeeper.replicas }}
......@@ -38,6 +47,15 @@ spec:
{{- end }}
storage:
type: ephemeral
{{- with .Values.strimzi.zookeeper.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
kafkaExporter: {}
......
......@@ -55,6 +55,9 @@ rules:
- get
- create
- update
{{- with .Values.rbac.additionalRules }}
{{ toYaml . | indent 2 }}
{{- end }}
{{- if .Values.operator.enabled }}
- apiGroups:
- theodolite.com
......
......@@ -173,6 +173,8 @@ strimzi:
jvmOptions:
"-Xmx": "512M"
"-Xms": "512M"
nodeSelectorTerms: []
zookeeper:
replicas: 3
zooEntrance:
......@@ -180,6 +182,8 @@ strimzi:
zookeeperClient:
enabled: true
nodeSelector: {}
nodeSelectorTerms: []
topicOperator:
enabled: true
......@@ -341,6 +345,7 @@ serviceAccount:
rbac:
create: true
additionalRules: []
randomScheduler:
enabled: true
......
......@@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG':
def calculate_slope_trend(results, warmup):
d = []
for result in results:
group = result['metric']['consumergroup']
group = result['metric'].get('consumergroup', "default")
for value in result['values']:
d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
......
package rocks.theodolite.benchmarks.commons.beam;
import java.io.IOException;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
......@@ -23,6 +25,7 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions;
private PipelineResult pipelineResult;
/**
* Create a new {@link BeamService}.
......@@ -43,14 +46,43 @@ public class BeamService {
}
/**
* Start this microservice, by running the underlying Beam pipeline.
* Start this microservice by running the underlying Beam pipeline.
*/
public void run() {
LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName);
pipeline.run().waitUntilFinish();
this.pipelineResult = pipeline.run();
}
/**
* Start this microservice by running the underlying Beam pipeline and block until this process is
* terminated.
*/
public void runStandalone() {
this.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.pipelineResult.waitUntilFinish();
}
/**
* Stop this microservice by canceling the underlying Beam pipeline.
*/
public void stop() {
LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName);
if (this.pipelineResult == null) {
throw new IllegalStateException("Cannot stop service since it has never been started.");
}
LOGGER.info("Stoping Beam pipeline.");
try {
this.pipelineResult.cancel();
this.pipelineResult = null; // NOPMD use null to indicate absence
} catch (final IOException e) {
throw new IllegalStateException(
"Stoping the service failed due to failed stop of Beam pipeline.", e);
}
LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName);
}
}
plugins {
id 'theodolite.beam'
}
dependencies {
implementation 'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.35.0'
}
\ No newline at end of file
......@@ -10,6 +10,7 @@ include 'uc1-commons'
include 'uc1-kstreams'
include 'uc1-flink'
include 'uc1-beam'
include 'uc1-beam-dataflow'
include 'uc1-beam-flink'
include 'uc1-beam-samza'
......@@ -24,6 +25,7 @@ include 'uc3-load-generator'
include 'uc3-kstreams'
include 'uc3-flink'
include 'uc3-beam'
include 'uc3-beam-dataflow'
include 'uc3-beam-flink'
include 'uc3-beam-samza'
......@@ -35,3 +37,4 @@ include 'uc4-beam-flink'
include 'uc4-beam-samza'
include 'http-bridge'
state
\ No newline at end of file
FROM openjdk:11-slim
ENV MAX_SOURCE_PARALLELISM=1024
ADD build/distributions/uc1-beam-samza.tar /
ADD samza-standalone.properties /
CMD /uc1-beam-samza/bin/uc1-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
plugins {
id 'theodolite.beam.dataflow'
}
dependencies {
implementation project(':uc1-beam')
}
sourceSets {
main {
resources {
srcDirs += [
project(':uc1-beam').sourceSets.main.resources
]
}
}
}
mainClassName = "rocks.theodolite.benchmarks.uc1.beam.dataflow.Uc1BeamDataflow"
package rocks.theodolite.benchmarks.uc1.beam.dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import rocks.theodolite.benchmarks.commons.beam.BeamService;
import rocks.theodolite.benchmarks.uc1.beam.PipelineFactory;
/**
* Implementation of the use case Database Storage using Apache Beam with the Google Cloud Dataflow
* runner.
*/
public final class Uc1BeamDataflow {
private Uc1BeamDataflow() {}
/**
* Main method.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), DataflowRunner.class, args).runStandalone();
}
}
......@@ -17,7 +17,7 @@ public final class Uc1BeamFlink {
private Uc1BeamFlink() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......
......@@ -21,6 +21,6 @@ public final class Uc1BeamSamza {
* Main method.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
......@@ -15,7 +15,7 @@ public final class Uc2BeamFlink {
private Uc2BeamFlink() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......@@ -19,7 +19,7 @@ public final class Uc2BeamSamza {
private Uc2BeamSamza() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
......
state
\ No newline at end of file
FROM openjdk:11-slim
ENV MAX_SOURCE_PARALLELISM=1024
ADD build/distributions/uc3-beam-samza.tar /
ADD samza-standalone.properties /
CMD /uc3-beam-samza/bin/uc3-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment