diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java index 8e3dbb3f98499c42f201f4f1250263b973148746..04b47cd8c4c6a976fc602fa2fbf93dcaaa36680e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java @@ -7,6 +7,11 @@ import org.apache.beam.sdk.values.PCollection; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; import titan.ccp.model.records.ActivePowerRecord; +/** + * A {@link PTransform} for a generic {@link DatabaseAdapter}. + * + * @param <T> Type parameter of {@link DatabaseAdapter}. + */ public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { private static final long serialVersionUID = 1L; @@ -14,13 +19,18 @@ public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, P private final DatabaseAdapter<T> databaseAdapter; private final Class<T> type; - public GenericSink(DatabaseAdapter<T> databaseAdapter, Class<T> type) { + /** + * Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the + * corresponding {@link Class} object for Beam. + */ + public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) { + super(); this.databaseAdapter = databaseAdapter; this.type = type; } @Override - public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { return activePowerRecords .apply(MapElements .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type))) diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java index b3ee54c7cce0b037f3aa92400987298e0a95bb94..91052827ff58f0bb52d289073c84e31cfc234c31 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java @@ -5,8 +5,12 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import titan.ccp.model.records.ActivePowerRecord; +/** + * Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store + * {@link ActivePowerRecord}s, optionally, using a {@link Configuration}. + */ public interface SinkFactory { PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration); -} \ No newline at end of file +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java index a195ecb7193ab99362d44733e9b6a6cea68128a6..82ca2573ef5108e2f2a9423400ca63be1760d449 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java @@ -8,6 +8,10 @@ import org.apache.commons.configuration2.Configuration; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import titan.ccp.model.records.ActivePowerRecord; +/** + * Supported Sink types, i.e., {@link PTransform} for converting and storing + * {@link ActivePowerRecord}s. + */ public enum SinkType implements SinkFactory { LOGGER("logger") { @@ -31,10 +35,13 @@ public enum SinkType implements SinkFactory { this.value = value; } - String getValue() { + public String getValue() { return this.value; } + /** + * Create a new {@link SinkType} from its string representation. + */ public static SinkType from(final String value) { return Stream.of(SinkType.values()) .filter(t -> t.value.equals(value)) diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java index 6c7bfa8389c2de3edef136a4cf417e217b94e24d..ab4617ecd1a46e083c863d26c999a25ee5008836 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java @@ -14,7 +14,8 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { private final String collection; - public DocumentMapper(String collection) { + public DocumentMapper(final String collection) { + super(); this.collection = collection; } @@ -30,7 +31,7 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { .build(); } - private String createDocumentName(String documentId) { + private String createDocumentName(final String documentId) { this.initFirestoreConfig(); return "projects/" + this.firestoreConfig.getProjectId() + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest() diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java index 790bc553337f372b5f0a7ffc249e1cc5f2687c2f..eb62d69f907cc27f3974f09942e5d75ba701a34f 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java @@ -6,9 +6,9 @@ import java.io.IOException; final class FirestoreConfig { - final FirestoreOptions firestoreOptions; + private final FirestoreOptions firestoreOptions; - private FirestoreConfig(FirestoreOptions firestoreOptions) { + private FirestoreConfig(final FirestoreOptions firestoreOptions) { this.firestoreOptions = firestoreOptions; } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java index 2ae56bcfa28dbc3c883a42fcd8f7a897a341ee2a..a1db24eeb9e05f3b8e198621f4a3e7107e095b13 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java @@ -1,5 +1,7 @@ package application.firestore; +import com.google.cloud.firestore.DocumentSnapshot; +import com.google.firestore.v1.Document; import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -7,6 +9,10 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; import titan.ccp.model.records.ActivePowerRecord; +/** + * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing + * these {@link DocumentSnapshot} to Firestore. + */ public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection"; @@ -15,19 +21,20 @@ public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PC private final String collectionName; - public FirestoreSink(String collectionName) { + public FirestoreSink(final String collectionName) { + super(); this.collectionName = collectionName; } @Override - public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { return activePowerRecords .apply(MapElements.via(new DocumentMapper(this.collectionName))) .apply(MapElements.via(new UpdateOperationMapper())) .apply(FirestoreIO.v1().write().batchWrite().build()); } - public static FirestoreSink fromConfig(Configuration config) { + public static FirestoreSink fromConfig(final Configuration config) { final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY); return new FirestoreSink(collectionName); }