diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java index 1f35d592ed9b2b1507eb5c30090d392d37ed7c1e..453bf9e560af5a371fc2c50b50b80fcc0e0f3202 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java @@ -6,10 +6,12 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander; +import rocks.theodolite.benchmarks.uc1.beam.pubsub.PubSubSource; import titan.ccp.model.records.ActivePowerRecord; /** @@ -17,8 +19,13 @@ import titan.ccp.model.records.ActivePowerRecord; */ public class PipelineFactory extends AbstractPipelineFactory { + public static final String SOURCE_TYPE_KEY = "source.type"; public static final String SINK_TYPE_KEY = "sink.type"; - + + public static final String PUBSSUB_SOURCE_PROJECT_KEY = "source.pubsub.project"; + public static final String PUBSSUB_SOURCE_TOPIC_KEY = "source.pubsub.topic"; + public static final String PUBSSUB_SOURCE_SUBSCR_KEY = "source.pubsub.subscription"; + private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); public PipelineFactory(final Configuration configuration) { @@ -41,18 +48,36 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { - final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); + final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); + final String sourceType = this.config.getString(SOURCE_TYPE_KEY); + + PCollection<ActivePowerRecord> activePowerRecords; + + if ("pubsub".equals(sourceType)) { + final String project = this.config.getString(PUBSSUB_SOURCE_PROJECT_KEY); + final String topic = this.config.getString(PUBSSUB_SOURCE_TOPIC_KEY); + final String subscription = this.config.getString(PUBSSUB_SOURCE_SUBSCR_KEY); + // Read messages from Pub/Sub and encode them as Avro records + if (subscription == null) { + activePowerRecords = pipeline.apply(PubSubSource.forTopic(topic, project)); + } else { + activePowerRecords = pipeline.apply(PubSubSource.forSubscription(project, subscription)); + } + } else { + final KafkaActivePowerTimestampReader kafka = super.buildKafkaReader(); + // Read messages from Kafka as Avro records and drop keys + activePowerRecords = pipeline.apply(kafka).apply(Values.create()); + } - pipeline.apply(kafkaReader) - .apply(Values.create()) - .apply(this.sinkType.create(this.config)); + // Forward Avro records to configured sink + activePowerRecords.apply(sinkType.create(this.config)); } @Override protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass( ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.SCHEMA$)); + AvroCoder.of(ActivePowerRecord.class, false)); } public static Function<Configuration, AbstractPipelineFactory> factory() { diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubEncoder.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..c60d644814a780ed0389c8848a7065dfd9304144 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubEncoder.java @@ -0,0 +1,26 @@ +package rocks.theodolite.benchmarks.uc1.beam.pubsub; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link SimpleFunction}, extracting and decoding {@link ActivePowerRecord}s from + * {@link PubsubMessage}s. + */ +public final class PubSubEncoder extends SimpleFunction<PubsubMessage, ActivePowerRecord> { + + private static final long serialVersionUID = -8872981416931508879L; + + @Override + public ActivePowerRecord apply(final PubsubMessage message) { + try { + return ActivePowerRecord.fromByteBuffer(ByteBuffer.wrap(message.getPayload())); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSource.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSource.java new file mode 100644 index 0000000000000000000000000000000000000000..7155b4a6d8d89d66278ae4d6241a9db458fe42e9 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSource.java @@ -0,0 +1,52 @@ +package rocks.theodolite.benchmarks.uc1.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * {@link PTransform} reading {@link ActivePowerRecord}s from Pub/Sub. + */ +public final class PubSubSource extends PTransform<PBegin, PCollection<ActivePowerRecord>> { + + private static final long serialVersionUID = 2603286151183186115L; + + private final Read<PubsubMessage> pubsubRead; + + private PubSubSource(final Read<PubsubMessage> pubsubRead) { + super(); + this.pubsubRead = pubsubRead; + } + + @Override + public PCollection<ActivePowerRecord> expand(final PBegin input) { + // Read messages from Pub/Sub and encode them as Avro records + return input.apply(this.pubsubRead).apply(MapElements.via(new PubSubEncoder())); + } + + /** + * Create a new {@link PubSubSource} for the given project and topic. + */ + public static final PubSubSource forTopic(final String projectName, final String topicName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromTopic(PubSubTopicFactory.create(projectName, topicName).asPath())); + } + + /** + * Create a new {@link PubSubSource} for the given project and subscription. + */ + public static final PubSubSource forSubscription(final String projectName, + final String subscriptionName) { + return new PubSubSource(PubsubIO + .readMessages() + .fromSubscription( + PubSubSubscriptionFactory.create(projectName, subscriptionName).asPath())); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSubscriptionFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSubscriptionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..8576c3cbfd3011c7d6f3ba8f6a522a454f2d9a75 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubSubscriptionFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc1.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription; + +/** + * Factory methods for creating {@link PubsubSubscription}s. + */ +public final class PubSubSubscriptionFactory { + + private PubSubSubscriptionFactory() {} + + /** + * Create a {@link PubsubSubscription} for the given project ID and subscription ID. + */ + public static PubsubSubscription create(final String project, final String subscription) { + return PubsubSubscription.fromPath("projects/" + project + "/subscriptions/" + subscription); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubTopicFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubTopicFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..518c4a570937f59f07f5ffe4ac0b09d80d1e747c --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/pubsub/PubSubTopicFactory.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc1.beam.pubsub; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic; + +/** + * Factory methods for creating {@link PubsubTopic}s. + */ +public final class PubSubTopicFactory { + + private PubSubTopicFactory() {} + + /** + * Create a {@link PubsubTopic} for the given project ID and topic ID. + */ + public static PubsubTopic create(final String projectId, final String topicId) { + return PubsubTopic.fromPath("projects/" + projectId + "/topics/" + topicId); + } + +}