Skip to content
Snippets Groups Projects
Commit a7efbcbb authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'beam-pubsub' into simple-uc3

parents 8311a563 7c00045f
No related branches found
No related tags found
No related merge requests found
Pipeline #7244 passed
...@@ -6,10 +6,12 @@ import org.apache.beam.sdk.coders.AvroCoder; ...@@ -6,10 +6,12 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory;
import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader;
import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander; import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander;
import rocks.theodolite.benchmarks.uc1.beam.pubsub.PubSubSource;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -17,8 +19,13 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -17,8 +19,13 @@ import titan.ccp.model.records.ActivePowerRecord;
*/ */
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public static final String SOURCE_TYPE_KEY = "source.type";
public static final String SINK_TYPE_KEY = "sink.type"; public static final String SINK_TYPE_KEY = "sink.type";
public static final String PUBSSUB_SOURCE_PROJECT_KEY = "source.pubsub.project";
public static final String PUBSSUB_SOURCE_TOPIC_KEY = "source.pubsub.topic";
public static final String PUBSSUB_SOURCE_SUBSCR_KEY = "source.pubsub.subscription";
private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -41,18 +48,36 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -41,18 +48,36 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
final String sourceType = this.config.getString(SOURCE_TYPE_KEY);
PCollection<ActivePowerRecord> activePowerRecords;
if ("pubsub".equals(sourceType)) {
final String project = this.config.getString(PUBSSUB_SOURCE_PROJECT_KEY);
final String topic = this.config.getString(PUBSSUB_SOURCE_TOPIC_KEY);
final String subscription = this.config.getString(PUBSSUB_SOURCE_SUBSCR_KEY);
// Read messages from Pub/Sub and encode them as Avro records
if (subscription == null) {
activePowerRecords = pipeline.apply(PubSubSource.forTopic(topic, project));
} else {
activePowerRecords = pipeline.apply(PubSubSource.forSubscription(project, subscription));
}
} else {
final KafkaActivePowerTimestampReader kafka = super.buildKafkaReader();
// Read messages from Kafka as Avro records and drop keys
activePowerRecords = pipeline.apply(kafka).apply(Values.create());
}
pipeline.apply(kafkaReader) // Forward Avro records to configured sink
.apply(Values.create()) activePowerRecords.apply(sinkType.create(this.config));
.apply(this.sinkType.create(this.config));
} }
@Override @Override
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass( registry.registerCoderForClass(
ActivePowerRecord.class, ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.SCHEMA$)); AvroCoder.of(ActivePowerRecord.class, false));
} }
public static Function<Configuration, AbstractPipelineFactory> factory() { public static Function<Configuration, AbstractPipelineFactory> factory() {
......
package rocks.theodolite.benchmarks.uc1.beam.pubsub;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.SimpleFunction;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link SimpleFunction}, extracting and decoding {@link ActivePowerRecord}s from
* {@link PubsubMessage}s.
*/
public final class PubSubEncoder extends SimpleFunction<PubsubMessage, ActivePowerRecord> {
private static final long serialVersionUID = -8872981416931508879L;
@Override
public ActivePowerRecord apply(final PubsubMessage message) {
try {
return ActivePowerRecord.fromByteBuffer(ByteBuffer.wrap(message.getPayload()));
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}
package rocks.theodolite.benchmarks.uc1.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link PTransform} reading {@link ActivePowerRecord}s from Pub/Sub.
*/
public final class PubSubSource extends PTransform<PBegin, PCollection<ActivePowerRecord>> {
private static final long serialVersionUID = 2603286151183186115L;
private final Read<PubsubMessage> pubsubRead;
private PubSubSource(final Read<PubsubMessage> pubsubRead) {
super();
this.pubsubRead = pubsubRead;
}
@Override
public PCollection<ActivePowerRecord> expand(final PBegin input) {
// Read messages from Pub/Sub and encode them as Avro records
return input.apply(this.pubsubRead).apply(MapElements.via(new PubSubEncoder()));
}
/**
* Create a new {@link PubSubSource} for the given project and topic.
*/
public static final PubSubSource forTopic(final String projectName, final String topicName) {
return new PubSubSource(PubsubIO
.readMessages()
.fromTopic(PubSubTopicFactory.create(projectName, topicName).asPath()));
}
/**
* Create a new {@link PubSubSource} for the given project and subscription.
*/
public static final PubSubSource forSubscription(final String projectName,
final String subscriptionName) {
return new PubSubSource(PubsubIO
.readMessages()
.fromSubscription(
PubSubSubscriptionFactory.create(projectName, subscriptionName).asPath()));
}
}
package rocks.theodolite.benchmarks.uc1.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription;
/**
* Factory methods for creating {@link PubsubSubscription}s.
*/
public final class PubSubSubscriptionFactory {
private PubSubSubscriptionFactory() {}
/**
* Create a {@link PubsubSubscription} for the given project ID and subscription ID.
*/
public static PubsubSubscription create(final String project, final String subscription) {
return PubsubSubscription.fromPath("projects/" + project + "/subscriptions/" + subscription);
}
}
package rocks.theodolite.benchmarks.uc1.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
/**
* Factory methods for creating {@link PubsubTopic}s.
*/
public final class PubSubTopicFactory {
private PubSubTopicFactory() {}
/**
* Create a {@link PubsubTopic} for the given project ID and topic ID.
*/
public static PubsubTopic create(final String projectId, final String topicId) {
return PubsubTopic.fromPath("projects/" + projectId + "/topics/" + topicId);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment