diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index ae80dcd8c5b263892f4539098a9a29b25c133819..659eb09e67487132bca2b3ecb82298690dbf33c6 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -3,5 +3,6 @@ plugins { } dependencies { - implementation project(':uc1-commons') + implementation project(':uc1-commons') + implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java new file mode 100644 index 0000000000000000000000000000000000000000..8e3dbb3f98499c42f201f4f1250263b973148746 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java @@ -0,0 +1,31 @@ +package application; + +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import titan.ccp.model.records.ActivePowerRecord; + +public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + private static final long serialVersionUID = 1L; + + private final DatabaseAdapter<T> databaseAdapter; + private final Class<T> type; + + public GenericSink(DatabaseAdapter<T> databaseAdapter, Class<T> type) { + this.databaseAdapter = databaseAdapter; + this.type = type; + } + + @Override + public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements + .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type))) + .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); + + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..b3ee54c7cce0b037f3aa92400987298e0a95bb94 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java @@ -0,0 +1,12 @@ +package application; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +public interface SinkFactory { + + PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration); + +} \ No newline at end of file diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java new file mode 100644 index 0000000000000000000000000000000000000000..a195ecb7193ab99362d44733e9b6a6cea68128a6 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java @@ -0,0 +1,45 @@ +package application; + +import application.firestore.FirestoreSink; +import java.util.stream.Stream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; +import titan.ccp.model.records.ActivePowerRecord; + +public enum SinkType implements SinkFactory { + + LOGGER("logger") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return new GenericSink<>(LogWriterFactory.forJson(), String.class); + } + }, + FIRESTORE("firestore") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return FirestoreSink.fromConfig(config); + } + }; + + private final String value; + + SinkType(final String value) { + this.value = value; + } + + String getValue() { + return this.value; + } + + public static SinkType from(final String value) { + return Stream.of(SinkType.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index dde2bc064af9b21e11b859fdbf3f6ba1374d17d7..eb894d13b38c46eb63136c2f670dfdf7e091356f 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,15 +1,10 @@ package application; -import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; -import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; -import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; @@ -25,30 +20,29 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc1BeamPipeline extends AbstractPipeline { - private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + public static final String SINK_TYPE_KEY = "sink.type"; protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); + final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); + // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - // build KafkaConsumerConfig - final Map<String, Object> consumerConfig = this.buildConsumerConfig(); - // Create Pipeline transformations - final KafkaActivePowerTimestampReader kafka = - new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( + this.bootstrapServer, + this.inputTopic, + this.buildConsumerConfig()); // Apply pipeline transformations // Read from Kafka this.apply(kafka) .apply(Values.create()) - .apply(MapElements.via(new ConverterAdapter<>( - this.databaseAdapter.getRecordConverter(), - String.class))) - .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); + .apply(sinkType.create(config)); } + } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..6c7bfa8389c2de3edef136a4cf417e217b94e24d --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java @@ -0,0 +1,51 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Value; +import java.io.IOException; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { + + private static final long serialVersionUID = -5263671231838353749L; // NOPMD + + private transient FirestoreConfig firestoreConfig; + + private final String collection; + + public DocumentMapper(String collection) { + this.collection = collection; + } + + @Override + public Document apply(final ActivePowerRecord record) { + return Document + .newBuilder() + .setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp())) + .putFields("identifier", + Value.newBuilder().setStringValue(record.getIdentifier()).build()) + .putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build()) + .putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build()) + .build(); + } + + private String createDocumentName(String documentId) { + this.initFirestoreConfig(); + return "projects/" + this.firestoreConfig.getProjectId() + + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest() + + "/documents/" + this.collection + + "/" + documentId; + } + + private void initFirestoreConfig() { + if (this.firestoreConfig == null) { + try { + this.firestoreConfig = FirestoreConfig.createFromDefaults(); + } catch (final IOException e) { + throw new IllegalStateException("Cannot create Firestore configuration.", e); + } + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..790bc553337f372b5f0a7ffc249e1cc5f2687c2f --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java @@ -0,0 +1,29 @@ +package application.firestore; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.firestore.FirestoreOptions; +import java.io.IOException; + +final class FirestoreConfig { + + final FirestoreOptions firestoreOptions; + + private FirestoreConfig(FirestoreOptions firestoreOptions) { + this.firestoreOptions = firestoreOptions; + } + + public String getProjectId() { + return this.firestoreOptions.getProjectId(); + } + + public String getDatabaseDdlRequest() { + return this.firestoreOptions.getProjectId(); + } + + public static FirestoreConfig createFromDefaults() throws IOException { + return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder() + .setCredentials(GoogleCredentials.getApplicationDefault()) + .build()); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java new file mode 100644 index 0000000000000000000000000000000000000000..2ae56bcfa28dbc3c883a42fcd8f7a897a341ee2a --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java @@ -0,0 +1,34 @@ +package application.firestore; + +import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection"; + + private static final long serialVersionUID = 1L; + + private final String collectionName; + + public FirestoreSink(String collectionName) { + this.collectionName = collectionName; + } + + @Override + public PCollection<?> expand(PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements.via(new DocumentMapper(this.collectionName))) + .apply(MapElements.via(new UpdateOperationMapper())) + .apply(FirestoreIO.v1().write().batchWrite().build()); + } + + public static FirestoreSink fromConfig(Configuration config) { + final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY); + return new FirestoreSink(collectionName); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..d67bed2aedbd97bfe4271efa0514c8d4e594683e --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java @@ -0,0 +1,18 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Write; +import org.apache.beam.sdk.transforms.SimpleFunction; + +final class UpdateOperationMapper extends SimpleFunction<Document, Write> { + + private static final long serialVersionUID = -5263671231838353748L; // NOPMD + + @Override + public Write apply(final Document document) { + return Write.newBuilder() + .setUpdate(document) + .build(); + } + +}