Skip to content
Snippets Groups Projects
Commit c2ce085d authored by Sören Henning's avatar Sören Henning
Browse files

Create firestore specific pipeline options

parent 15180d87
No related branches found
No related tags found
No related merge requests found
Pipeline #6962 passed
...@@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values; ...@@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory;
import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader;
import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord;
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public static final String SINK_TYPE_KEY = "sink.type"; public static final String SINK_TYPE_KEY = "sink.type";
private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
super(configuration); super(configuration);
...@@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory {
// final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class);
// pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost);
// } // }
if (this.sinkType == SinkType.FIRESTORE) {
FirestoreOptionsExpander.expandOptions(options);
}
} }
@Override @Override
protected void constructPipeline(final Pipeline pipeline) { protected void constructPipeline(final Pipeline pipeline) {
final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
pipeline.apply(kafkaReader) pipeline.apply(kafkaReader)
.apply(Values.create()) .apply(Values.create())
.apply(sinkType.create(this.config)); .apply(this.sinkType.create(this.config));
} }
@Override @Override
......
package rocks.theodolite.benchmarks.uc1.beam.firestore;
import java.io.IOException;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* Provides a method to expand {@link PipelineOptions} for Firestore.
*/
public final class FirestoreOptionsExpander {
private FirestoreOptionsExpander() {}
/**
* Expand {@link PipelineOptions} by special options required for Firestore derived from a default
* configuration.
*
* @param options {@link PipelineOptions} to be expanded.
*/
public static void expandOptions(final PipelineOptions options) {
final GcpOptions firestoreOptions = options.as(GcpOptions.class);
final FirestoreConfig firestoreConfig = getFirestoreConfig();
firestoreOptions.setProject(firestoreConfig.getProjectId());
}
private static FirestoreConfig getFirestoreConfig() {
try {
return FirestoreConfig.createFromDefaults();
} catch (final IOException e) {
throw new IllegalStateException("Cannot create Firestore configuration.", e);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment