From 66a33fc3c0b6680c2421ea7e164022632ad2f7d6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 18 Feb 2022 18:04:01 +0100
Subject: [PATCH] Add basic firestore implementation

---
 theodolite-benchmarks/uc1-beam/build.gradle   |  3 +-
 .../main/java/application/GenericSink.java    | 31 +++++++++++
 .../main/java/application/SinkFactory.java    | 12 +++++
 .../src/main/java/application/SinkType.java   | 45 ++++++++++++++++
 .../java/application/Uc1BeamPipeline.java     | 24 ++++-----
 .../application/firestore/DocumentMapper.java | 51 +++++++++++++++++++
 .../firestore/FirestoreConfig.java            | 29 +++++++++++
 .../application/firestore/FirestoreSink.java  | 34 +++++++++++++
 .../firestore/UpdateOperationMapper.java      | 18 +++++++
 9 files changed, 231 insertions(+), 16 deletions(-)
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java
 create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java

diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle
index ae80dcd8c..659eb09e6 100644
--- a/theodolite-benchmarks/uc1-beam/build.gradle
+++ b/theodolite-benchmarks/uc1-beam/build.gradle
@@ -3,5 +3,6 @@ plugins {
 }
 
 dependencies {
-    implementation project(':uc1-commons')
+  implementation project(':uc1-commons')
+  implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
 }
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java
new file mode 100644
index 000000000..8e3dbb3f9
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java
@@ -0,0 +1,31 @@
+package application;
+
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final DatabaseAdapter<T> databaseAdapter;
+  private final Class<T> type;
+
+  public GenericSink(DatabaseAdapter<T> databaseAdapter, Class<T> type) {
+    this.databaseAdapter = databaseAdapter;
+    this.type = type;
+  }
+
+  @Override
+  public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) {
+    return activePowerRecords
+        .apply(MapElements
+            .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type)))
+        .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
+
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java
new file mode 100644
index 000000000..b3ee54c7c
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java
@@ -0,0 +1,12 @@
+package application;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public interface SinkFactory {
+
+  PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration);
+
+}
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java
new file mode 100644
index 000000000..a195ecb71
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java
@@ -0,0 +1,45 @@
+package application;
+
+import application.firestore.FirestoreSink;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public enum SinkType implements SinkFactory {
+
+  LOGGER("logger") {
+    @Override
+    public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
+        final Configuration config) {
+      return new GenericSink<>(LogWriterFactory.forJson(), String.class);
+    }
+  },
+  FIRESTORE("firestore") {
+    @Override
+    public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
+        final Configuration config) {
+      return FirestoreSink.fromConfig(config);
+    }
+  };
+
+  private final String value;
+
+  SinkType(final String value) {
+    this.value = value;
+  }
+
+  String getValue() {
+    return this.value;
+  }
+
+  public static SinkType from(final String value) {
+    return Stream.of(SinkType.values())
+        .filter(t -> t.value.equals(value))
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist."));
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index dde2bc064..eb894d13b 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -1,15 +1,10 @@
 package application;
 
-import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.commons.configuration2.Configuration;
-import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
-import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import theodolite.commons.beam.AbstractPipeline;
 import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
 import titan.ccp.model.records.ActivePowerRecord;
@@ -25,30 +20,29 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public final class Uc1BeamPipeline extends AbstractPipeline {
 
-  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+  public static final String SINK_TYPE_KEY = "sink.type";
 
   protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
     super(options, config);
 
+    final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY));
+
     // Set Coders for Classes that will be distributed
     final CoderRegistry cr = this.getCoderRegistry();
     cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
-    // build KafkaConsumerConfig
-    final Map<String, Object> consumerConfig = this.buildConsumerConfig();
-
     // Create Pipeline transformations
-    final KafkaActivePowerTimestampReader kafka =
-        new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
+    final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(
+        this.bootstrapServer,
+        this.inputTopic,
+        this.buildConsumerConfig());
 
     // Apply pipeline transformations
     // Read from Kafka
     this.apply(kafka)
         .apply(Values.create())
-        .apply(MapElements.via(new ConverterAdapter<>(
-            this.databaseAdapter.getRecordConverter(),
-            String.class)))
-        .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
+        .apply(sinkType.create(config));
   }
+
 }
 
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java
new file mode 100644
index 000000000..6c7bfa838
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java
@@ -0,0 +1,51 @@
+package application.firestore;
+
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Value;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import titan.ccp.model.records.ActivePowerRecord;
+
+final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
+
+  private static final long serialVersionUID = -5263671231838353749L; // NOPMD
+
+  private transient FirestoreConfig firestoreConfig;
+
+  private final String collection;
+
+  public DocumentMapper(String collection) {
+    this.collection = collection;
+  }
+
+  @Override
+  public Document apply(final ActivePowerRecord record) {
+    return Document
+        .newBuilder()
+        .setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp()))
+        .putFields("identifier",
+            Value.newBuilder().setStringValue(record.getIdentifier()).build())
+        .putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build())
+        .putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build())
+        .build();
+  }
+
+  private String createDocumentName(String documentId) {
+    this.initFirestoreConfig();
+    return "projects/" + this.firestoreConfig.getProjectId()
+        + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest()
+        + "/documents/" + this.collection
+        + "/" + documentId;
+  }
+
+  private void initFirestoreConfig() {
+    if (this.firestoreConfig == null) {
+      try {
+        this.firestoreConfig = FirestoreConfig.createFromDefaults();
+      } catch (final IOException e) {
+        throw new IllegalStateException("Cannot create Firestore configuration.", e);
+      }
+    }
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java
new file mode 100644
index 000000000..790bc5533
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java
@@ -0,0 +1,29 @@
+package application.firestore;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.firestore.FirestoreOptions;
+import java.io.IOException;
+
+final class FirestoreConfig {
+
+  final FirestoreOptions firestoreOptions;
+
+  private FirestoreConfig(FirestoreOptions firestoreOptions) {
+    this.firestoreOptions = firestoreOptions;
+  }
+
+  public String getProjectId() {
+    return this.firestoreOptions.getProjectId();
+  }
+
+  public String getDatabaseDdlRequest() {
+    return this.firestoreOptions.getProjectId();
+  }
+
+  public static FirestoreConfig createFromDefaults() throws IOException {
+    return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder()
+        .setCredentials(GoogleCredentials.getApplicationDefault())
+        .build());
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java
new file mode 100644
index 000000000..2ae56bcfa
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java
@@ -0,0 +1,34 @@
+package application.firestore;
+
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
+
+  public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection";
+
+  private static final long serialVersionUID = 1L;
+
+  private final String collectionName;
+
+  public FirestoreSink(String collectionName) {
+    this.collectionName = collectionName;
+  }
+
+  @Override
+  public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) {
+    return activePowerRecords
+        .apply(MapElements.via(new DocumentMapper(this.collectionName)))
+        .apply(MapElements.via(new UpdateOperationMapper()))
+        .apply(FirestoreIO.v1().write().batchWrite().build());
+  }
+
+  public static FirestoreSink fromConfig(Configuration config) {
+    final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY);
+    return new FirestoreSink(collectionName);
+  }
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java
new file mode 100644
index 000000000..d67bed2ae
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java
@@ -0,0 +1,18 @@
+package application.firestore;
+
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Write;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+
+final class UpdateOperationMapper extends SimpleFunction<Document, Write> {
+
+  private static final long serialVersionUID = -5263671231838353748L; // NOPMD
+
+  @Override
+  public Write apply(final Document document) {
+    return Write.newBuilder()
+        .setUpdate(document)
+        .build();
+  }
+
+}
-- 
GitLab