diff --git a/theodolite-benchmarks/uc3-beam/build.gradle b/theodolite-benchmarks/uc3-beam/build.gradle index 502e94fa737fb2ae1bab861407b27575cd8766ca..1a9a69f49631ab7efd12600b08586dc5baddbc77 100644 --- a/theodolite-benchmarks/uc3-beam/build.gradle +++ b/theodolite-benchmarks/uc3-beam/build.gradle @@ -2,4 +2,6 @@ plugins { id 'theodolite.beam' } - +dependencies { + implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java index 3c0d7acdbeccfaf03aac70df478e3db6dd1378e4..cb75b65e796a085cf96323f69433dd848f67076a 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java @@ -11,17 +11,17 @@ import titan.ccp.model.records.ActivePowerRecord; * Changes the time format to us Europe/Paris time. */ public class MapTimeFormat - extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> { + extends SimpleFunction<ActivePowerRecord, KV<HourOfDayKey, ActivePowerRecord>> { private static final long serialVersionUID = -6597391279968647035L; private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); private final ZoneId zone = ZoneId.of("Europe/Paris"); @Override - public KV<HourOfDayKey, ActivePowerRecord> apply(final KV<String, ActivePowerRecord> kv) { - final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); + public KV<HourOfDayKey, ActivePowerRecord> apply(final ActivePowerRecord record) { + final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return KV.of( - this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), - kv.getValue()); + this.keyFactory.createKey(record.getIdentifier(), dateTime), + record); } } diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index de960d3d8466f9f420f002667df04d8a2fc64873..d734b02c61a91ab63010cddae6e0f993c14f4a50 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -11,6 +11,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; @@ -43,19 +44,19 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void constructPipeline(final Pipeline pipeline) { final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + // TODO make seconds final Duration duration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + // TODO make seconds final Duration aggregationAdvanceDuration = Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + // TODO not needed final Duration triggerDelay = Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); - // Map the time format - final MapTimeFormat mapTimeFormat = new MapTimeFormat(); - // Get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); @@ -65,12 +66,15 @@ public class PipelineFactory extends AbstractPipelineFactory { new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); pipeline.apply(kafkaReader) + .apply(Values.create()) // TODO drop keys // Map to correct time format - .apply(MapElements.via(mapTimeFormat)) + // TODO optional + .apply(MapElements.via(new MapTimeFormat())) // Apply a sliding window .apply(Window .<KV<HourOfDayKey, ActivePowerRecord>>into( SlidingWindows.of(duration).every(aggregationAdvanceDuration)) + // TODO remove trigger .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java index b7643c2c4f439d14472b6ea3bbbfab32b1ecd4c9..b2783fe4ed92f945bc023adf50b3ce20bb3436d3 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/SimplePipelineFactory.java @@ -12,14 +12,17 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import org.joda.time.Duration; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; +import rocks.theodolite.benchmarks.uc3.beam.pubsub.PubSubSource; import titan.ccp.model.records.ActivePowerRecord; /** @@ -27,6 +30,12 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class SimplePipelineFactory extends AbstractPipelineFactory { + public static final String SOURCE_TYPE_KEY = "source.type"; + + public static final String PUBSSUB_SOURCE_PROJECT_KEY = "source.pubsub.project"; + public static final String PUBSSUB_SOURCE_TOPIC_KEY = "source.pubsub.topic"; + public static final String PUBSSUB_SOURCE_SUBSCR_KEY = "source.pubsub.subscription"; + public SimplePipelineFactory(final Configuration configuration) { super(configuration); } @@ -44,9 +53,25 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { final Duration aggregationAdvanceDuration = Duration.standardSeconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_SECONDS)); - // Read from Kafka - // TODO allow for pubsub - final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); + final String sourceType = this.config.getString(SOURCE_TYPE_KEY); + + PCollection<ActivePowerRecord> activePowerRecords; + + if ("pubsub".equals(sourceType)) { + final String project = this.config.getString(PUBSSUB_SOURCE_PROJECT_KEY); + final String topic = this.config.getString(PUBSSUB_SOURCE_TOPIC_KEY); + final String subscription = this.config.getString(PUBSSUB_SOURCE_SUBSCR_KEY); + // Read messages from Pub/Sub and encode them as Avro records + if (subscription == null) { + activePowerRecords = pipeline.apply(PubSubSource.forTopic(topic, project)); + } else { + activePowerRecords = pipeline.apply(PubSubSource.forSubscription(project, subscription)); + } + } else { + final KafkaActivePowerTimestampReader kafka = super.buildKafkaReader(); + // Read messages from Kafka as Avro records and drop keys + activePowerRecords = pipeline.apply(kafka).apply(Values.create()); + } // Map the time format final MapTimeFormat mapTimeFormat = new MapTimeFormat(); @@ -54,7 +79,7 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { // Get the stats per HourOfDay final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); - pipeline.apply(kafkaReader) + activePowerRecords // Map to correct time format // TODO optional .apply(MapElements.via(mapTimeFormat)) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..c14090f58ca7095327423a38910876aa7ad37eac --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubEncoder.java @@ -0,0 +1,26 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link SimpleFunction}, extracting and decoding {@link ActivePowerRecord}s from + * {@link PubsubMessage}s. + */ +public final class PubSubEncoder extends SimpleFunction<PubsubMessage, ActivePowerRecord> { + + private static final long serialVersionUID = -8872981416931508879L; + + @Override + public ActivePowerRecord apply(final PubsubMessage message) { + try { + return ActivePowerRecord.fromByteBuffer(ByteBuffer.wrap(message.getPayload())); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java new file mode 100644 index 0000000000000000000000000000000000000000..46769fa9a35e73d2c1cd7e5c8316c48f0e481166 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSource.java @@ -0,0 +1,52 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link PTransform} reading {@link ActivePowerRecord}s from Pub/Sub. + */ +public final class PubSubSource extends PTransform<PBegin, PCollection<ActivePowerRecord>> { + + private static final long serialVersionUID = 2603286151183186115L; + + private final Read<PubsubMessage> pubsubRead; + + private PubSubSource(final Read<PubsubMessage> pubsubRead) { + super(); + this.pubsubRead = pubsubRead; + } + + @Override + public PCollection<ActivePowerRecord> expand(final PBegin input) { + // Read messages from Pub/Sub and encode them as Avro records + return input.apply(this.pubsubRead).apply(MapElements.via(new PubSubEncoder())); + } + + /** + * Create a new {@link PubSubSource} for the given project and topic. + */ + public static final PubSubSource forTopic(final String projectName, final String topicName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromTopic(PubSubTopicFactory.create(projectName, topicName).asPath())); + } + + /** + * Create a new {@link PubSubSource} for the given project and subscription. + */ + public static final PubSubSource forSubscription(final String projectName, + final String subscriptionName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromSubscription( + PubSubSubscriptionFactory.create(projectName, subscriptionName).asPath())); + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..330082a4ba624fd00b581dcca4703b53de00af40 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubSubscriptionFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription; + +/** + * Factory methods for creating {@link PubsubSubscription}s. + */ +public final class PubSubSubscriptionFactory { + + private PubSubSubscriptionFactory() {} + + /** + * Create a {@link PubsubSubscription} for the given project ID and subscription ID. + */ + public static PubsubSubscription create(final String project, final String subscription) { + return PubsubSubscription.fromPath("projects/" + project + "/subscriptions/" + subscription); + } + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..bb4492eeb0f3ca24b0f8949b2a8080f030ee89b7 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/pubsub/PubSubTopicFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; + +/** + * Factory methods for creating {@link PubsubTopic}s. + */ +public final class PubSubTopicFactory { + + private PubSubTopicFactory() {} + + /** + * Create a {@link PubsubTopic} for the given project ID and topic ID. + */ + public static PubsubTopic create(final String projectId, final String topicId) { + return PubsubTopic.fromPath("projects/" + projectId + "/topics/" + topicId); + } + +}