Skip to content
Snippets Groups Projects
Commit 98f8e1c8 authored by Sören Henning's avatar Sören Henning
Browse files

Fix QA warnings

parent 66a33fc3
No related branches found
No related tags found
1 merge request!245Firestore sink for UC1 Beam
Pipeline #6641 passed
...@@ -7,6 +7,11 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -7,6 +7,11 @@ import org.apache.beam.sdk.values.PCollection;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} for a generic {@link DatabaseAdapter}.
*
* @param <T> Type parameter of {@link DatabaseAdapter}.
*/
public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -14,13 +19,18 @@ public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, P ...@@ -14,13 +19,18 @@ public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, P
private final DatabaseAdapter<T> databaseAdapter; private final DatabaseAdapter<T> databaseAdapter;
private final Class<T> type; private final Class<T> type;
public GenericSink(DatabaseAdapter<T> databaseAdapter, Class<T> type) { /**
* Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the
* corresponding {@link Class} object for Beam.
*/
public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) {
super();
this.databaseAdapter = databaseAdapter; this.databaseAdapter = databaseAdapter;
this.type = type; this.type = type;
} }
@Override @Override
public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords return activePowerRecords
.apply(MapElements .apply(MapElements
.via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type))) .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type)))
......
...@@ -5,6 +5,10 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -5,6 +5,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store
* {@link ActivePowerRecord}s, optionally, using a {@link Configuration}.
*/
public interface SinkFactory { public interface SinkFactory {
PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration); PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration);
......
...@@ -8,6 +8,10 @@ import org.apache.commons.configuration2.Configuration; ...@@ -8,6 +8,10 @@ import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* Supported Sink types, i.e., {@link PTransform} for converting and storing
* {@link ActivePowerRecord}s.
*/
public enum SinkType implements SinkFactory { public enum SinkType implements SinkFactory {
LOGGER("logger") { LOGGER("logger") {
...@@ -31,10 +35,13 @@ public enum SinkType implements SinkFactory { ...@@ -31,10 +35,13 @@ public enum SinkType implements SinkFactory {
this.value = value; this.value = value;
} }
String getValue() { public String getValue() {
return this.value; return this.value;
} }
/**
* Create a new {@link SinkType} from its string representation.
*/
public static SinkType from(final String value) { public static SinkType from(final String value) {
return Stream.of(SinkType.values()) return Stream.of(SinkType.values())
.filter(t -> t.value.equals(value)) .filter(t -> t.value.equals(value))
......
...@@ -14,7 +14,8 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { ...@@ -14,7 +14,8 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
private final String collection; private final String collection;
public DocumentMapper(String collection) { public DocumentMapper(final String collection) {
super();
this.collection = collection; this.collection = collection;
} }
...@@ -30,7 +31,7 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { ...@@ -30,7 +31,7 @@ final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
.build(); .build();
} }
private String createDocumentName(String documentId) { private String createDocumentName(final String documentId) {
this.initFirestoreConfig(); this.initFirestoreConfig();
return "projects/" + this.firestoreConfig.getProjectId() return "projects/" + this.firestoreConfig.getProjectId()
+ "/databases/" + this.firestoreConfig.getDatabaseDdlRequest() + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest()
......
...@@ -6,9 +6,9 @@ import java.io.IOException; ...@@ -6,9 +6,9 @@ import java.io.IOException;
final class FirestoreConfig { final class FirestoreConfig {
final FirestoreOptions firestoreOptions; private final FirestoreOptions firestoreOptions;
private FirestoreConfig(FirestoreOptions firestoreOptions) { private FirestoreConfig(final FirestoreOptions firestoreOptions) {
this.firestoreOptions = firestoreOptions; this.firestoreOptions = firestoreOptions;
} }
......
package application.firestore; package application.firestore;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.firestore.v1.Document;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
...@@ -7,6 +9,10 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -7,6 +9,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing
* these {@link DocumentSnapshot} to Firestore.
*/
public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection"; public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection";
...@@ -15,19 +21,20 @@ public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PC ...@@ -15,19 +21,20 @@ public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PC
private final String collectionName; private final String collectionName;
public FirestoreSink(String collectionName) { public FirestoreSink(final String collectionName) {
super();
this.collectionName = collectionName; this.collectionName = collectionName;
} }
@Override @Override
public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords return activePowerRecords
.apply(MapElements.via(new DocumentMapper(this.collectionName))) .apply(MapElements.via(new DocumentMapper(this.collectionName)))
.apply(MapElements.via(new UpdateOperationMapper())) .apply(MapElements.via(new UpdateOperationMapper()))
.apply(FirestoreIO.v1().write().batchWrite().build()); .apply(FirestoreIO.v1().write().batchWrite().build());
} }
public static FirestoreSink fromConfig(Configuration config) { public static FirestoreSink fromConfig(final Configuration config) {
final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY); final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY);
return new FirestoreSink(collectionName); return new FirestoreSink(collectionName);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment