From c2ce085dc43b9a2a99122f92166e7b39dd1e8d0f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Tue, 8 Mar 2022 13:55:19 +0100
Subject: [PATCH] Create firestore specific pipeline options

---
 .../benchmarks/uc1/beam/PipelineFactory.java  | 10 ++++--
 .../firestore/FirestoreOptionsExpander.java   | 34 +++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java

diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java
index 32658a21b..1f35d592e 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/PipelineFactory.java
@@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values;
 import org.apache.commons.configuration2.Configuration;
 import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory;
 import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader;
+import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander;
 import titan.ccp.model.records.ActivePowerRecord;
 
 /**
@@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord;
 public class PipelineFactory extends AbstractPipelineFactory {
 
   public static final String SINK_TYPE_KEY = "sink.type";
+  
+  private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
 
   public PipelineFactory(final Configuration configuration) {
     super(configuration);
@@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory {
     // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class);
     // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost);
     // }
+    if (this.sinkType == SinkType.FIRESTORE) {
+      FirestoreOptionsExpander.expandOptions(options);
+    }
   }
 
   @Override
   protected void constructPipeline(final Pipeline pipeline) {
-    final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
-
     final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
 
     pipeline.apply(kafkaReader)
         .apply(Values.create())
-        .apply(sinkType.create(this.config));
+        .apply(this.sinkType.create(this.config));
   }
 
   @Override
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java
new file mode 100644
index 000000000..0447450b4
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/firestore/FirestoreOptionsExpander.java
@@ -0,0 +1,34 @@
+package rocks.theodolite.benchmarks.uc1.beam.firestore;
+
+import java.io.IOException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Provides a method to expand {@link PipelineOptions} for Firestore.
+ */
+public final class FirestoreOptionsExpander {
+
+  private FirestoreOptionsExpander() {}
+
+  /**
+   * Expand {@link PipelineOptions} by special options required for Firestore derived from a default
+   * configuration.
+   *
+   * @param options {@link PipelineOptions} to be expanded.
+   */
+  public static void expandOptions(final PipelineOptions options) {
+    final GcpOptions firestoreOptions = options.as(GcpOptions.class);
+    final FirestoreConfig firestoreConfig = getFirestoreConfig();
+    firestoreOptions.setProject(firestoreConfig.getProjectId());
+  }
+
+  private static FirestoreConfig getFirestoreConfig() {
+    try {
+      return FirestoreConfig.createFromDefaults();
+    } catch (final IOException e) {
+      throw new IllegalStateException("Cannot create Firestore configuration.", e);
+    }
+  }
+
+}
-- 
GitLab