diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d14e478aed45186eb71aa78ca65d8b54ade5856..fc590f029433f64a0f655dc28c3898c7847e4ab5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -131,7 +131,7 @@ lint-helm: script: helm lint helm/ rules: - changes: - - helm/* + - helm/**/* - when: manual allow_failure: true diff --git a/helm/templates/kafka/kafka-cluster.yaml b/helm/templates/kafka/kafka-cluster.yaml index 29cf038f12aa6ee38b21697b8d79b5aea378c7d8..1ff89396513a134e553bc4b97f771822f52ac2ed 100644 --- a/helm/templates/kafka/kafka-cluster.yaml +++ b/helm/templates/kafka/kafka-cluster.yaml @@ -30,6 +30,15 @@ spec: configMapKeyRef: name: {{ template "theodolite.fullname" . }}-kafka-metrics key: kafka-metrics-config.yml + {{- with .Values.strimzi.kafka.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} zookeeper: {{- with .Values.strimzi.zookeeper.replicas }} @@ -37,7 +46,16 @@ spec: {{- toYaml . | nindent 6 }} {{- end }} storage: - type: ephemeral + type: ephemeral + {{- with .Values.strimzi.zookeeper.nodeSelectorTerms}} + template: + pod: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + {{- toYaml . | nindent 16 }} + {{- end}} kafkaExporter: {} diff --git a/helm/values.yaml b/helm/values.yaml index 007e069e07db2bf06612ac40e6811a99e571bbfa..765f8e4e6bd0a0f9d59dc812d4b7a01d134e10b0 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -173,6 +173,8 @@ strimzi: jvmOptions: "-Xmx": "512M" "-Xms": "512M" + nodeSelectorTerms: [] + zookeeper: replicas: 3 zooEntrance: @@ -180,6 +182,8 @@ strimzi: zookeeperClient: enabled: true nodeSelector: {} + nodeSelectorTerms: [] + topicOperator: enabled: true diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java index a4a8f69d74f32697d8e43d58bc5765631fea63de..607f591ffea766041c0472f9995e971f075c31b6 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java @@ -1,7 +1,9 @@ package rocks.theodolite.benchmarks.commons.beam; +import java.io.IOException; import java.util.function.Function; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -23,6 +25,7 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + private PipelineResult pipelineResult; /** * Create a new {@link BeamService}. @@ -43,14 +46,43 @@ public class BeamService { } /** - * Start this microservice, by running the underlying Beam pipeline. + * Start this microservice by running the underlying Beam pipeline. */ public void run() { LOGGER.info("Constructing Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); LOGGER.info("Starting BeamService {}.", this.applicationName); - pipeline.run().waitUntilFinish(); + this.pipelineResult = pipeline.run(); + } + + /** + * Start this microservice by running the underlying Beam pipeline and block until this process is + * terminated. + */ + public void runStandalone() { + this.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop())); + this.pipelineResult.waitUntilFinish(); + } + + /** + * Stop this microservice by canceling the underlying Beam pipeline. + */ + public void stop() { + LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName); + if (this.pipelineResult == null) { + throw new IllegalStateException("Cannot stop service since it has never been started."); + } + LOGGER.info("Stoping Beam pipeline."); + try { + this.pipelineResult.cancel(); + this.pipelineResult = null; // NOPMD use null to indicate absence + } catch (final IOException e) { + throw new IllegalStateException( + "Stoping the service failed due to failed stop of Beam pipeline.", e); + } + LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); } } diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle new file mode 100644 index 0000000000000000000000000000000000000000..3499ba449dbb699038427b622003d5bcf145e034 --- /dev/null +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.beam.dataflow.gradle @@ -0,0 +1,7 @@ +plugins { + id 'theodolite.beam' +} + +dependencies { + implementation 'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.35.0' +} \ No newline at end of file diff --git a/theodolite-benchmarks/settings.gradle b/theodolite-benchmarks/settings.gradle index 776e7d8e4fe132839b6e27c70c368720415721ea..42651f5970d1744ccb94866f19e8742a60ce8e31 100644 --- a/theodolite-benchmarks/settings.gradle +++ b/theodolite-benchmarks/settings.gradle @@ -10,6 +10,7 @@ include 'uc1-commons' include 'uc1-kstreams' include 'uc1-flink' include 'uc1-beam' +include 'uc1-beam-dataflow' include 'uc1-beam-flink' include 'uc1-beam-samza' @@ -24,6 +25,7 @@ include 'uc3-load-generator' include 'uc3-kstreams' include 'uc3-flink' include 'uc3-beam' +include 'uc3-beam-dataflow' include 'uc3-beam-flink' include 'uc3-beam-samza' @@ -35,3 +37,4 @@ include 'uc4-beam-flink' include 'uc4-beam-samza' include 'http-bridge' + diff --git a/theodolite-benchmarks/uc1-beam-dataflow/.gitignore b/theodolite-benchmarks/uc1-beam-dataflow/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7bf05dd280fcc888467656ce1fbdeb65322c7ba8 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/.gitignore @@ -0,0 +1 @@ +state \ No newline at end of file diff --git a/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile b/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..cf6ef6675464e3c9d37db492b39fd8a71ec60e63 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/Dockerfile @@ -0,0 +1,9 @@ +FROM openjdk:11-slim + +ENV MAX_SOURCE_PARALLELISM=1024 + +ADD build/distributions/uc1-beam-samza.tar / +ADD samza-standalone.properties / + +CMD /uc1-beam-samza/bin/uc1-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}" + diff --git a/theodolite-benchmarks/uc1-beam-dataflow/build.gradle b/theodolite-benchmarks/uc1-beam-dataflow/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..21ffd94500cf2945f04fbe59c511ec370b891a37 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'theodolite.beam.dataflow' +} + +dependencies { + implementation project(':uc1-beam') +} + +sourceSets { + main { + resources { + srcDirs += [ + project(':uc1-beam').sourceSets.main.resources + ] + } + } +} + +mainClassName = "rocks.theodolite.benchmarks.uc1.beam.dataflow.Uc1BeamDataflow" diff --git a/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java b/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java new file mode 100644 index 0000000000000000000000000000000000000000..24af2f24d175509a43b669f5cddf5eaad0bd81ba --- /dev/null +++ b/theodolite-benchmarks/uc1-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dataflow/Uc1BeamDataflow.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc1.beam.dataflow; + +import org.apache.beam.runners.dataflow.DataflowRunner; +import rocks.theodolite.benchmarks.commons.beam.BeamService; +import rocks.theodolite.benchmarks.uc1.beam.PipelineFactory; + +/** + * Implementation of the use case Database Storage using Apache Beam with the Google Cloud Dataflow + * runner. + */ +public final class Uc1BeamDataflow { + + private Uc1BeamDataflow() {} + + /** + * Main method. + */ + public static void main(final String[] args) { + new BeamService(PipelineFactory.factory(), DataflowRunner.class, args).runStandalone(); + } +} diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java index e1317219fedf24bc4b0eb4a3f9668da7de196cca..7f39500433a77612fe5ab010372a24ca46035135 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java @@ -17,7 +17,7 @@ public final class Uc1BeamFlink { private Uc1BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java index d3455db71bc3520bfa11c4da3a58c32da46337f9..9c3f650b7ddbe5e3c08139cdec2e590f5d55f3b3 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java @@ -21,6 +21,6 @@ public final class Uc1BeamSamza { * Main method. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java index 4519515cf7d74abb0c447c56df4bbe313133c6a7..c1dc2f7305d01b47de644e4f8d391955540f530c 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java @@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; /** * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. - * + * * @param <T> type the {@link DatabaseWriter} is associated with. */ public class WriterAdapter<T> extends DoFn<T, Void> { diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java index ab6a9992a5dfca11a182235b467d5be76488ed55..2772d76fa26f504827ab74acb8fccc45f117365c 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc2BeamFlink { private Uc2BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java index 80981818d401b48ed61ee56987764684df9dd31f..1b3f4ac8a2d052f0d34051e6b17b62100feb129d 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java @@ -19,7 +19,7 @@ public final class Uc2BeamSamza { private Uc2BeamSamza() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-dataflow/.gitignore b/theodolite-benchmarks/uc3-beam-dataflow/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7bf05dd280fcc888467656ce1fbdeb65322c7ba8 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/.gitignore @@ -0,0 +1 @@ +state \ No newline at end of file diff --git a/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile b/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..54979b8e1fa8aa9ac7d073302301bd10cbff5f34 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/Dockerfile @@ -0,0 +1,8 @@ +FROM openjdk:11-slim + +ENV MAX_SOURCE_PARALLELISM=1024 + +ADD build/distributions/uc3-beam-samza.tar / +ADD samza-standalone.properties / + +CMD /uc3-beam-samza/bin/uc3-beam-samza --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false --configOverride="{\"job.coordinator.zk.connect\":\"$SAMZA_JOB_COORDINATOR_ZK_CONNECT\"}" diff --git a/theodolite-benchmarks/uc3-beam-dataflow/build.gradle b/theodolite-benchmarks/uc3-beam-dataflow/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..dabf19000f884c44657eadd15670c2eda1a4802e --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'theodolite.beam.dataflow' +} + +dependencies { + implementation project(':uc3-beam') +} + +sourceSets { + main { + resources { + srcDirs += [ + project(':uc3-beam').sourceSets.main.resources + ] + } + } +} + +mainClassName = "rocks.theodolite.benchmarks.uc3.beam.dataflow.Uc3BeamDataflow" diff --git a/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java b/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java new file mode 100644 index 0000000000000000000000000000000000000000..d708ad4d9406329ccdb2f4fd0a3311e3e64eb963 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-dataflow/src/main/java/rocks/theodolite/benchmarks/uc3/beam/dataflow/Uc3BeamDataflow.java @@ -0,0 +1,23 @@ +package rocks.theodolite.benchmarks.uc3.beam.dataflow; + +import org.apache.beam.runners.dataflow.DataflowRunner; +import rocks.theodolite.benchmarks.commons.beam.BeamService; +import rocks.theodolite.benchmarks.uc3.beam.SimplePipelineFactory; + +/** + * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the + * Google Cloud Dataflow runner. + */ +public final class Uc3BeamDataflow { + + private Uc3BeamDataflow() {} + + /** + * Start running this microservice. + */ + public static void main(final String[] args) { + new BeamService(SimplePipelineFactory.factory(), DataflowRunner.class, args).runStandalone(); + } + +} + diff --git a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java index 84e705f6f52f41f5c553a1ef3fb2ebd7ce95e20a..247cd99becff8a200185c8fa40efb49bf31a6806 100644 --- a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java +++ b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java @@ -21,7 +21,7 @@ public final class Uc3BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam/build.gradle b/theodolite-benchmarks/uc3-beam/build.gradle index 502e94fa737fb2ae1bab861407b27575cd8766ca..1a9a69f49631ab7efd12600b08586dc5baddbc77 100644 --- a/theodolite-benchmarks/uc3-beam/build.gradle +++ b/theodolite-benchmarks/uc3-beam/build.gradle @@ -2,4 +2,6 @@ plugins { id 'theodolite.beam' } - +dependencies { + implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java index 3c0d7acdbeccfaf03aac70df478e3db6dd1378e4..cb75b65e796a085cf96323f69433dd848f67076a 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java @@ -11,17 +11,17 @@ import titan.ccp.model.records.ActivePowerRecord; * Changes the time format to us Europe/Paris time. */ public class MapTimeFormat - extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> { + extends SimpleFunction<ActivePowerRecord, KV<HourOfDayKey, ActivePowerRecord>> { private static final long serialVersionUID = -6597391279968647035L; private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); private final ZoneId zone = ZoneId.of("Europe/Paris"); @Override - public KV<HourOfDayKey, ActivePowerRecord> apply(final KV<String, ActivePowerRecord> kv) { - final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); + public KV<HourOfDayKey, ActivePowerRecord> apply(final ActivePowerRecord record) { + final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return KV.of( - this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), - kv.getValue()); + this.keyFactory.createKey(record.getIdentifier(), dateTime), + record); } } diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index de960d3d8466f9f420f002667df04d8a2fc64873..d734b02c61a91ab63010cddae6e0f993c14f4a50 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -11,6 +11,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; @@ -43,19 +44,19 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void constructPipeline(final Pipeline pipeline) { final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + // TODO make seconds final Duration duration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + // TODO make seconds final Duration aggregationAdvanceDuration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + // TODO not needed final Duration triggerDelay = Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); - // Map the time format - final MapTimeFormat mapTimeFormat = new MapTimeFormat(); - // Get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); @@ -65,12 +66,15 @@ public class PipelineFactory extends AbstractPipelineFactory { new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); pipeline.apply(kafkaReader) + .apply(Values.create()) // TODO drop keys // Map to correct time format - .apply(MapElements.via(mapTimeFormat)) + // TODO optional + .apply(MapElements.via(new MapTimeFormat())) // Apply a sliding window .apply(Window .<KV<HourOfDayKey, ActivePowerRecord>>into( SlidingWindows.of(duration).every(aggregationAdvanceDuration)) + // TODO remove trigger .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java index b7643c2c4f439d14472b6ea3bbbfab32b1ecd4c9..b2783fe4ed92f945bc023adf50b3ce20bb3436d3 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java @@ -12,14 +12,17 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import org.joda.time.Duration; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; +import rocks.theodolite.benchmarks.uc3.beam.pubsub.PubSubSource; import titan.ccp.model.records.ActivePowerRecord; /** @@ -27,6 +30,12 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class SimplePipelineFactory extends AbstractPipelineFactory { + public static final String SOURCE_TYPE_KEY = "source.type"; + + public static final String PUBSSUB_SOURCE_PROJECT_KEY = "source.pubsub.project"; + public static final String PUBSSUB_SOURCE_TOPIC_KEY = "source.pubsub.topic"; + public static final String PUBSSUB_SOURCE_SUBSCR_KEY = "source.pubsub.subscription"; + public SimplePipelineFactory(final Configuration configuration) { super(configuration); } @@ -44,9 +53,25 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { final Duration aggregationAdvanceDuration = Duration.standardSeconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_SECONDS)); - // Read from Kafka - // TODO allow for pubsub - final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); + final String sourceType = this.config.getString(SOURCE_TYPE_KEY); + + PCollection<ActivePowerRecord> activePowerRecords; + + if ("pubsub".equals(sourceType)) { + final String project = this.config.getString(PUBSSUB_SOURCE_PROJECT_KEY); + final String topic = this.config.getString(PUBSSUB_SOURCE_TOPIC_KEY); + final String subscription = this.config.getString(PUBSSUB_SOURCE_SUBSCR_KEY); + // Read messages from Pub/Sub and encode them as Avro records + if (subscription == null) { + activePowerRecords = pipeline.apply(PubSubSource.forTopic(topic, project)); + } else { + activePowerRecords = pipeline.apply(PubSubSource.forSubscription(project, subscription)); + } + } else { + final KafkaActivePowerTimestampReader kafka = super.buildKafkaReader(); + // Read messages from Kafka as Avro records and drop keys + activePowerRecords = pipeline.apply(kafka).apply(Values.create()); + } // Map the time format final MapTimeFormat mapTimeFormat = new MapTimeFormat(); @@ -54,7 +79,7 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { // Get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); - pipeline.apply(kafkaReader) + activePowerRecords // Map to correct time format // TODO optional .apply(MapElements.via(mapTimeFormat)) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..c14090f58ca7095327423a38910876aa7ad37eac --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java @@ -0,0 +1,26 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link SimpleFunction}, extracting and decoding {@link ActivePowerRecord}s from + * {@link PubsubMessage}s. + */ +public final class PubSubEncoder extends SimpleFunction<PubsubMessage, ActivePowerRecord> { + + private static final long serialVersionUID = -8872981416931508879L; + + @Override + public ActivePowerRecord apply(final PubsubMessage message) { + try { + return ActivePowerRecord.fromByteBuffer(ByteBuffer.wrap(message.getPayload())); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java new file mode 100644 index 0000000000000000000000000000000000000000..46769fa9a35e73d2c1cd7e5c8316c48f0e481166 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java @@ -0,0 +1,52 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link PTransform} reading {@link ActivePowerRecord}s from Pub/Sub. + */ +public final class PubSubSource extends PTransform<PBegin, PCollection<ActivePowerRecord>> { + + private static final long serialVersionUID = 2603286151183186115L; + + private final Read<PubsubMessage> pubsubRead; + + private PubSubSource(final Read<PubsubMessage> pubsubRead) { + super(); + this.pubsubRead = pubsubRead; + } + + @Override + public PCollection<ActivePowerRecord> expand(final PBegin input) { + // Read messages from Pub/Sub and encode them as Avro records + return input.apply(this.pubsubRead).apply(MapElements.via(new PubSubEncoder())); + } + + /** + * Create a new {@link PubSubSource} for the given project and topic. + */ + public static final PubSubSource forTopic(final String projectName, final String topicName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromTopic(PubSubTopicFactory.create(projectName, topicName).asPath())); + } + + /** + * Create a new {@link PubSubSource} for the given project and subscription. + */ + public static final PubSubSource forSubscription(final String projectName, + final String subscriptionName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromSubscription( + PubSubSubscriptionFactory.create(projectName, subscriptionName).asPath())); + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..330082a4ba624fd00b581dcca4703b53de00af40 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription; + +/** + * Factory methods for creating {@link PubsubSubscription}s. + */ +public final class PubSubSubscriptionFactory { + + private PubSubSubscriptionFactory() {} + + /** + * Create a {@link PubsubSubscription} for the given project ID and subscription ID. + */ + public static PubsubSubscription create(final String project, final String subscription) { + return PubsubSubscription.fromPath("projects/" + project + "/subscriptions/" + subscription); + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..bb4492eeb0f3ca24b0f8949b2a8080f030ee89b7 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; + +/** + * Factory methods for creating {@link PubsubTopic}s. + */ +public final class PubSubTopicFactory { + + private PubSubTopicFactory() {} + + /** + * Create a {@link PubsubTopic} for the given project ID and topic ID. + */ + public static PubsubTopic create(final String projectId, final String topicId) { + return PubsubTopic.fromPath("projects/" + projectId + "/topics/" + topicId); + } + +} diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java index 5d398d610a12890e3fb9c85804a4b59a69163b4f..f5f9af3fc14b57476975708a139788e7f0386953 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc4BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java index 044b3dc4b647dffa02a62d17c9fcdaf15b0a0869..585e3ff9589c0262c12b6fa33023cd69b58c53f1 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java @@ -22,7 +22,7 @@ public final class Uc4BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } }