diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java index 32658a21b8b80fddb5baf58002a701e8e35b542e..1f35d592ed9b2b1507eb5c30090d392d37ed7c1e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java @@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory; import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader; +import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander; import titan.ccp.model.records.ActivePowerRecord; /** @@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord; public class PipelineFactory extends AbstractPipelineFactory { public static final String SINK_TYPE_KEY = "sink.type"; + + private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); public PipelineFactory(final Configuration configuration) { super(configuration); @@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory { // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); // } + if (this.sinkType == SinkType.FIRESTORE) { + FirestoreOptionsExpander.expandOptions(options); + } } @Override protected void constructPipeline(final Pipeline pipeline) { - final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); - final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); pipeline.apply(kafkaReader) .apply(Values.create()) - .apply(sinkType.create(this.config)); + .apply(this.sinkType.create(this.config)); } @Override diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java new file mode 100644 index 0000000000000000000000000000000000000000..0447450b45b971f96e2f2cbb7ce91f78604d5a23 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java @@ -0,0 +1,34 @@ +package rocks.theodolite.benchmarks.uc1.beam.firestore; + +import java.io.IOException; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Provides a method to expand {@link PipelineOptions} for Firestore. + */ +public final class FirestoreOptionsExpander { + + private FirestoreOptionsExpander() {} + + /** + * Expand {@link PipelineOptions} by special options required for Firestore derived from a default + * configuration. + * + * @param options {@link PipelineOptions} to be expanded. + */ + public static void expandOptions(final PipelineOptions options) { + final GcpOptions firestoreOptions = options.as(GcpOptions.class); + final FirestoreConfig firestoreConfig = getFirestoreConfig(); + firestoreOptions.setProject(firestoreConfig.getProjectId()); + } + + private static FirestoreConfig getFirestoreConfig() { + try { + return FirestoreConfig.createFromDefaults(); + } catch (final IOException e) { + throw new IllegalStateException("Cannot create Firestore configuration.", e); + } + } + +}