Skip to content
Snippets Groups Projects
Commit 72c6d6a8 authored by Sören Henning's avatar Sören Henning
Browse files

Add support for PubSub

parent 6c564cb2
No related branches found
No related tags found
No related merge requests found
Pipeline #7240 passed
Showing
with 161 additions and 14 deletions
...@@ -2,4 +2,6 @@ plugins { ...@@ -2,4 +2,6 @@ plugins {
id 'theodolite.beam' id 'theodolite.beam'
} }
dependencies {
implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
}
...@@ -11,17 +11,17 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -11,17 +11,17 @@ import titan.ccp.model.records.ActivePowerRecord;
* Changes the time format to us Europe/Paris time. * Changes the time format to us Europe/Paris time.
*/ */
public class MapTimeFormat public class MapTimeFormat
extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> { extends SimpleFunction<ActivePowerRecord, KV<HourOfDayKey, ActivePowerRecord>> {
private static final long serialVersionUID = -6597391279968647035L; private static final long serialVersionUID = -6597391279968647035L;
private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
private final ZoneId zone = ZoneId.of("Europe/Paris"); private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override @Override
public KV<HourOfDayKey, ActivePowerRecord> apply(final KV<String, ActivePowerRecord> kv) { public KV<HourOfDayKey, ActivePowerRecord> apply(final ActivePowerRecord record) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of( return KV.of(
this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), this.keyFactory.createKey(record.getIdentifier(), dateTime),
kv.getValue()); record);
} }
} }
...@@ -11,6 +11,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; ...@@ -11,6 +11,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
...@@ -43,19 +44,19 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -43,19 +44,19 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
// TODO make seconds
final Duration duration = final Duration duration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
// TODO make seconds
final Duration aggregationAdvanceDuration = final Duration aggregationAdvanceDuration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
// TODO not needed
final Duration triggerDelay = final Duration triggerDelay =
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
// Read from Kafka // Read from Kafka
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
// Map the time format
final MapTimeFormat mapTimeFormat = new MapTimeFormat();
// Get the stats per HourOfDay // Get the stats per HourOfDay
final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
...@@ -65,12 +66,15 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -65,12 +66,15 @@ public class PipelineFactory extends AbstractPipelineFactory {
new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
pipeline.apply(kafkaReader) pipeline.apply(kafkaReader)
.apply(Values.create()) // TODO drop keys
// Map to correct time format // Map to correct time format
.apply(MapElements.via(mapTimeFormat)) // TODO optional
.apply(MapElements.via(new MapTimeFormat()))
// Apply a sliding window // Apply a sliding window
.apply(Window .apply(Window
.<KV<HourOfDayKey, ActivePowerRecord>>into( .<KV<HourOfDayKey, ActivePowerRecord>>into(
SlidingWindows.of(duration).every(aggregationAdvanceDuration)) SlidingWindows.of(duration).every(aggregationAdvanceDuration))
// TODO remove trigger
.triggering(AfterWatermark.pastEndOfWindow() .triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings( .withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay))) AfterProcessingTime.pastFirstElementInPane().plusDelayOf(triggerDelay)))
......
...@@ -12,14 +12,17 @@ import org.apache.beam.sdk.options.PipelineOptions; ...@@ -12,14 +12,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.joda.time.Duration; import org.joda.time.Duration;
import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory;
import rocks.theodolite.benchmarks.commons.beam.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.beam.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader;
import rocks.theodolite.benchmarks.uc3.beam.pubsub.PubSubSource;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -27,6 +30,12 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -27,6 +30,12 @@ import titan.ccp.model.records.ActivePowerRecord;
*/ */
public class SimplePipelineFactory extends AbstractPipelineFactory { public class SimplePipelineFactory extends AbstractPipelineFactory {
public static final String SOURCE_TYPE_KEY = "source.type";
public static final String PUBSSUB_SOURCE_PROJECT_KEY = "source.pubsub.project";
public static final String PUBSSUB_SOURCE_TOPIC_KEY = "source.pubsub.topic";
public static final String PUBSSUB_SOURCE_SUBSCR_KEY = "source.pubsub.subscription";
public SimplePipelineFactory(final Configuration configuration) { public SimplePipelineFactory(final Configuration configuration) {
super(configuration); super(configuration);
} }
...@@ -44,9 +53,25 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { ...@@ -44,9 +53,25 @@ public class SimplePipelineFactory extends AbstractPipelineFactory {
final Duration aggregationAdvanceDuration = final Duration aggregationAdvanceDuration =
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_SECONDS)); Duration.standardSeconds(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_SECONDS));
// Read from Kafka final String sourceType = this.config.getString(SOURCE_TYPE_KEY);
// TODO allow for pubsub
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); PCollection<ActivePowerRecord> activePowerRecords;
if ("pubsub".equals(sourceType)) {
final String project = this.config.getString(PUBSSUB_SOURCE_PROJECT_KEY);
final String topic = this.config.getString(PUBSSUB_SOURCE_TOPIC_KEY);
final String subscription = this.config.getString(PUBSSUB_SOURCE_SUBSCR_KEY);
// Read messages from Pub/Sub and encode them as Avro records
if (subscription == null) {
activePowerRecords = pipeline.apply(PubSubSource.forTopic(topic, project));
} else {
activePowerRecords = pipeline.apply(PubSubSource.forSubscription(project, subscription));
}
} else {
final KafkaActivePowerTimestampReader kafka = super.buildKafkaReader();
// Read messages from Kafka as Avro records and drop keys
activePowerRecords = pipeline.apply(kafka).apply(Values.create());
}
// Map the time format // Map the time format
final MapTimeFormat mapTimeFormat = new MapTimeFormat(); final MapTimeFormat mapTimeFormat = new MapTimeFormat();
...@@ -54,7 +79,7 @@ public class SimplePipelineFactory extends AbstractPipelineFactory { ...@@ -54,7 +79,7 @@ public class SimplePipelineFactory extends AbstractPipelineFactory {
// Get the stats per HourOfDay // Get the stats per HourOfDay
final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
pipeline.apply(kafkaReader) activePowerRecords
// Map to correct time format // Map to correct time format
// TODO optional // TODO optional
.apply(MapElements.via(mapTimeFormat)) .apply(MapElements.via(mapTimeFormat))
......
package rocks.theodolite.benchmarks.uc3.beam.pubsub;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.SimpleFunction;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link SimpleFunction}, extracting and decoding {@link ActivePowerRecord}s from
* {@link PubsubMessage}s.
*/
public final class PubSubEncoder extends SimpleFunction<PubsubMessage, ActivePowerRecord> {
private static final long serialVersionUID = -8872981416931508879L;
@Override
public ActivePowerRecord apply(final PubsubMessage message) {
try {
return ActivePowerRecord.fromByteBuffer(ByteBuffer.wrap(message.getPayload()));
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}
package rocks.theodolite.benchmarks.uc3.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link PTransform} reading {@link ActivePowerRecord}s from Pub/Sub.
*/
public final class PubSubSource extends PTransform<PBegin, PCollection<ActivePowerRecord>> {
private static final long serialVersionUID = 2603286151183186115L;
private final Read<PubsubMessage> pubsubRead;
private PubSubSource(final Read<PubsubMessage> pubsubRead) {
super();
this.pubsubRead = pubsubRead;
}
@Override
public PCollection<ActivePowerRecord> expand(final PBegin input) {
// Read messages from Pub/Sub and encode them as Avro records
return input.apply(this.pubsubRead).apply(MapElements.via(new PubSubEncoder()));
}
/**
* Create a new {@link PubSubSource} for the given project and topic.
*/
public static final PubSubSource forTopic(final String projectName, final String topicName) {
return new PubSubSource(PubsubIO
.readMessages()
.fromTopic(PubSubTopicFactory.create(projectName, topicName).asPath()));
}
/**
* Create a new {@link PubSubSource} for the given project and subscription.
*/
public static final PubSubSource forSubscription(final String projectName,
final String subscriptionName) {
return new PubSubSource(PubsubIO
.readMessages()
.fromSubscription(
PubSubSubscriptionFactory.create(projectName, subscriptionName).asPath()));
}
}
package rocks.theodolite.benchmarks.uc3.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubSubscription;
/**
* Factory methods for creating {@link PubsubSubscription}s.
*/
public final class PubSubSubscriptionFactory {
private PubSubSubscriptionFactory() {}
/**
* Create a {@link PubsubSubscription} for the given project ID and subscription ID.
*/
public static PubsubSubscription create(final String project, final String subscription) {
return PubsubSubscription.fromPath("projects/" + project + "/subscriptions/" + subscription);
}
}
package rocks.theodolite.benchmarks.uc3.beam.pubsub;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PubsubTopic;
/**
* Factory methods for creating {@link PubsubTopic}s.
*/
public final class PubSubTopicFactory {
private PubSubTopicFactory() {}
/**
* Create a {@link PubsubTopic} for the given project ID and topic ID.
*/
public static PubsubTopic create(final String projectId, final String topicId) {
return PubsubTopic.fromPath("projects/" + projectId + "/topics/" + topicId);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment