diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
index 03c5ca1daa7ffab71a4d08c04f677d7412e3a2be..3e94fb4c878401183f45ff384e39dd6bc0291a27 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java
@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory;
 import titan.ccp.common.configuration.ServiceConfigurations;
 
 /**
- * Abstraction of a Beam microservice.
- * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner.
+ * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
+ * the beam Runner.
  */
 public class AbstractBeamService {
 
@@ -20,26 +20,24 @@ public class AbstractBeamService {
 
   // Application Configurations
   private final Configuration config = ServiceConfigurations.createWithDefaults();
-  private final String applicationName =
-      config.getString(ConfigurationKeys.APPLICATION_NAME);
-
+  private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
 
   /**
    * Creates AbstractBeamService with options.
    */
-  public AbstractBeamService(final String[] args) { //NOPMD
+  public AbstractBeamService(final String[] args) { // NOPMD
     super();
     LOGGER.info("Pipeline options:");
     for (final String s : args) {
       LOGGER.info("{}", s);
     }
-    options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setJobName(applicationName);
-    LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString());
+    this.options = PipelineOptionsFactory.fromArgs(args).create();
+    this.options.setJobName(this.applicationName);
+    LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
   }
 
   public Configuration getConfig() {
-    return config;
+    return this.config;
   }
 
 }
diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java
index 83336b5a4c2451ef4bffefbd60ad9d52fccd9c17..e513c3a0e3dffcb9881f389af5ee9f05c52a2b63 100644
--- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java
+++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java
@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.kafka.common.serialization.Deserializer;
 
 /**
  * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
  * @param <K> Type of the Key.
  * @param <V> Type of the Value.
  */
-public class KafkaGenericReader<K, V> extends
-    PTransform<PBegin, PCollection<KV<K, V>>> {
+public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
 
   private static final long serialVersionUID = 2603286150183186115L;
   private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
   /**
    * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
    */
-  public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
-                            final Map<String, Object> consumerConfig,
-                            final Class<? extends
-                                org.apache.kafka.common.serialization.Deserializer<K>>
-                                  keyDeserializer,
-                            final Class<? extends
-                                org.apache.kafka.common.serialization.Deserializer<V>>
-                                  valueDeserializer) {
+  public KafkaGenericReader(
+      final String bootstrapServer,
+      final String inputTopic,
+      final Map<String, Object> consumerConfig,
+      final Class<? extends Deserializer<K>> keyDeserializer,
+      final Class<? extends Deserializer<V>> valueDeserializer) {
     super();
 
     // Check if boostrap server and inputTopic are defined
@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
       throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
     }
 
-    reader =
+    this.reader =
         KafkaIO.<K, V>read()
             .withBootstrapServers(bootstrapServer)
             .withTopic(inputTopic)
diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties
index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644
--- a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties
@@ -1,6 +1,8 @@
 application.name=theodolite-uc1-application
 application.version=0.0.1
 
+sink.type=logger
+
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
@@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1
 
 specific.avro.reader=True
 enable.auto.commit.config=True
-auto.offset.reset.config=earliest
\ No newline at end of file
+auto.offset.reset.config=earliest
diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties
index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644
--- a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties
+++ b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties
@@ -1,6 +1,8 @@
 application.name=theodolite-uc1-application
 application.version=0.0.1
 
+sink.type=logger
+
 kafka.bootstrap.servers=localhost:9092
 kafka.input.topic=input
 kafka.output.topic=output
@@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1
 
 specific.avro.reader=True
 enable.auto.commit.config=True
-auto.offset.reset.config=earliest
\ No newline at end of file
+auto.offset.reset.config=earliest
diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle
index ae80dcd8c5b263892f4539098a9a29b25c133819..659eb09e67487132bca2b3ecb82298690dbf33c6 100644
--- a/theodolite-benchmarks/uc1-beam/build.gradle
+++ b/theodolite-benchmarks/uc1-beam/build.gradle
@@ -3,5 +3,6 @@ plugins {
 }
 
 dependencies {
-    implementation project(':uc1-commons')
+  implementation project(':uc1-commons')
+  implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
 }
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..04b47cd8c4c6a976fc602fa2fbf93dcaaa36680e
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java
@@ -0,0 +1,41 @@
+package application;
+
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A {@link PTransform} for a generic {@link DatabaseAdapter}.
+ *
+ * @param <T> Type parameter of {@link DatabaseAdapter}.
+ */
+public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
+
+  private static final long serialVersionUID = 1L;
+
+  private final DatabaseAdapter<T> databaseAdapter;
+  private final Class<T> type;
+
+  /**
+   * Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the
+   * corresponding {@link Class} object for Beam.
+   */
+  public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) {
+    super();
+    this.databaseAdapter = databaseAdapter;
+    this.type = type;
+  }
+
+  @Override
+  public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
+    return activePowerRecords
+        .apply(MapElements
+            .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type)))
+        .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
+
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..91052827ff58f0bb52d289073c84e31cfc234c31
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java
@@ -0,0 +1,16 @@
+package application;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store
+ * {@link ActivePowerRecord}s, optionally, using a {@link Configuration}.
+ */
+public interface SinkFactory {
+
+  PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration);
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java
new file mode 100644
index 0000000000000000000000000000000000000000..82ca2573ef5108e2f2a9423400ca63be1760d449
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java
@@ -0,0 +1,52 @@
+package application;
+
+import application.firestore.FirestoreSink;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * Supported Sink types, i.e., {@link PTransform} for converting and storing
+ * {@link ActivePowerRecord}s.
+ */
+public enum SinkType implements SinkFactory {
+
+  LOGGER("logger") {
+    @Override
+    public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
+        final Configuration config) {
+      return new GenericSink<>(LogWriterFactory.forJson(), String.class);
+    }
+  },
+  FIRESTORE("firestore") {
+    @Override
+    public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
+        final Configuration config) {
+      return FirestoreSink.fromConfig(config);
+    }
+  };
+
+  private final String value;
+
+  SinkType(final String value) {
+    this.value = value;
+  }
+
+  public String getValue() {
+    return this.value;
+  }
+
+  /**
+   * Create a new {@link SinkType} from its string representation.
+   */
+  public static SinkType from(final String value) {
+    return Stream.of(SinkType.values())
+        .filter(t -> t.value.equals(value))
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist."));
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index dde2bc064af9b21e11b859fdbf3f6ba1374d17d7..352b32a29ff6cfd5d01a4e74798f79c8d08c769a 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -1,54 +1,39 @@
 package application;
 
-import java.util.Map;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.commons.configuration2.Configuration;
-import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
-import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
 import theodolite.commons.beam.AbstractPipeline;
 import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
 import titan.ccp.model.records.ActivePowerRecord;
 
-
 /**
- * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
- * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
- * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
- * using--flinkMaster as run parameter. To persist logs add
- * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
- * Input Output in Common in the Run Configuration Start via Eclipse Run.
+ * Implementation of benchmark UC1: Database Storage with Apache Beam.
  */
 public final class Uc1BeamPipeline extends AbstractPipeline {
 
-  private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
+  public static final String SINK_TYPE_KEY = "sink.type";
 
   protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
     super(options, config);
 
-    // Set Coders for Classes that will be distributed
-    final CoderRegistry cr = this.getCoderRegistry();
-    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
+    final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY));
 
-    // build KafkaConsumerConfig
-    final Map<String, Object> consumerConfig = this.buildConsumerConfig();
+    // Set Coders for classes that will be distributed
+    final CoderRegistry cr = super.getCoderRegistry();
+    cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
-    // Create Pipeline transformations
-    final KafkaActivePowerTimestampReader kafka =
-        new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
+    final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(
+        super.bootstrapServer,
+        super.inputTopic,
+        super.buildConsumerConfig());
 
-    // Apply pipeline transformations
-    // Read from Kafka
-    this.apply(kafka)
+    super.apply(kafka)
         .apply(Values.create())
-        .apply(MapElements.via(new ConverterAdapter<>(
-            this.databaseAdapter.getRecordConverter(),
-            String.class)))
-        .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
+        .apply(sinkType.create(config));
   }
+
 }
 
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..ab4617ecd1a46e083c863d26c999a25ee5008836
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java
@@ -0,0 +1,52 @@
+package application.firestore;
+
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Value;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import titan.ccp.model.records.ActivePowerRecord;
+
+final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
+
+  private static final long serialVersionUID = -5263671231838353749L; // NOPMD
+
+  private transient FirestoreConfig firestoreConfig;
+
+  private final String collection;
+
+  public DocumentMapper(final String collection) {
+    super();
+    this.collection = collection;
+  }
+
+  @Override
+  public Document apply(final ActivePowerRecord record) {
+    return Document
+        .newBuilder()
+        .setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp()))
+        .putFields("identifier",
+            Value.newBuilder().setStringValue(record.getIdentifier()).build())
+        .putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build())
+        .putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build())
+        .build();
+  }
+
+  private String createDocumentName(final String documentId) {
+    this.initFirestoreConfig();
+    return "projects/" + this.firestoreConfig.getProjectId()
+        + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest()
+        + "/documents/" + this.collection
+        + "/" + documentId;
+  }
+
+  private void initFirestoreConfig() {
+    if (this.firestoreConfig == null) {
+      try {
+        this.firestoreConfig = FirestoreConfig.createFromDefaults();
+      } catch (final IOException e) {
+        throw new IllegalStateException("Cannot create Firestore configuration.", e);
+      }
+    }
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..eb62d69f907cc27f3974f09942e5d75ba701a34f
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java
@@ -0,0 +1,29 @@
+package application.firestore;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.firestore.FirestoreOptions;
+import java.io.IOException;
+
+final class FirestoreConfig {
+
+  private final FirestoreOptions firestoreOptions;
+
+  private FirestoreConfig(final FirestoreOptions firestoreOptions) {
+    this.firestoreOptions = firestoreOptions;
+  }
+
+  public String getProjectId() {
+    return this.firestoreOptions.getProjectId();
+  }
+
+  public String getDatabaseDdlRequest() {
+    return this.firestoreOptions.getProjectId();
+  }
+
+  public static FirestoreConfig createFromDefaults() throws IOException {
+    return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder()
+        .setCredentials(GoogleCredentials.getApplicationDefault())
+        .build());
+  }
+
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..a1db24eeb9e05f3b8e198621f4a3e7107e095b13
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java
@@ -0,0 +1,41 @@
+package application.firestore;
+
+import com.google.cloud.firestore.DocumentSnapshot;
+import com.google.firestore.v1.Document;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.configuration2.Configuration;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing
+ * these {@link DocumentSnapshot} to Firestore.
+ */
+public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
+
+  public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection";
+
+  private static final long serialVersionUID = 1L;
+
+  private final String collectionName;
+
+  public FirestoreSink(final String collectionName) {
+    super();
+    this.collectionName = collectionName;
+  }
+
+  @Override
+  public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
+    return activePowerRecords
+        .apply(MapElements.via(new DocumentMapper(this.collectionName)))
+        .apply(MapElements.via(new UpdateOperationMapper()))
+        .apply(FirestoreIO.v1().write().batchWrite().build());
+  }
+
+  public static FirestoreSink fromConfig(final Configuration config) {
+    final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY);
+    return new FirestoreSink(collectionName);
+  }
+}
diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..d67bed2aedbd97bfe4271efa0514c8d4e594683e
--- /dev/null
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java
@@ -0,0 +1,18 @@
+package application.firestore;
+
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Write;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+
+final class UpdateOperationMapper extends SimpleFunction<Document, Write> {
+
+  private static final long serialVersionUID = -5263671231838353748L; // NOPMD
+
+  @Override
+  public Write apply(final Document document) {
+    return Write.newBuilder()
+        .setUpdate(document)
+        .build();
+  }
+
+}