Skip to content
Snippets Groups Projects
Commit ef530a0b authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'uc1-beam-firestore' into 'master'

Firestore sink for UC1 Beam

Closes #347

See merge request !245
parents 11834614 421f89ff
No related branches found
No related tags found
1 merge request!245Firestore sink for UC1 Beam
Pipeline #6651 passed
Showing
with 287 additions and 52 deletions
......@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* Abstraction of a Beam microservice.
* Encapsulates the corresponding {@link PipelineOptions} and the beam Runner.
* Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* the beam Runner.
*/
public class AbstractBeamService {
......@@ -20,26 +20,24 @@ public class AbstractBeamService {
// Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName =
config.getString(ConfigurationKeys.APPLICATION_NAME);
private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
/**
* Creates AbstractBeamService with options.
*/
public AbstractBeamService(final String[] args) { //NOPMD
public AbstractBeamService(final String[] args) { // NOPMD
super();
LOGGER.info("Pipeline options:");
for (final String s : args) {
LOGGER.info("{}", s);
}
options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(applicationName);
LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString());
this.options = PipelineOptionsFactory.fromArgs(args).create();
this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
}
public Configuration getConfig() {
return config;
return this.config;
}
}
......@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
......@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
* @param <K> Type of the Key.
* @param <V> Type of the Value.
*/
public class KafkaGenericReader<K, V> extends
PTransform<PBegin, PCollection<KV<K, V>>> {
public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
......@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<K>>
keyDeserializer,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
public KafkaGenericReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig,
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer) {
super();
// Check if boostrap server and inputTopic are defined
......@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
this.reader =
KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
......
application.name=theodolite-uc1-application
application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
......@@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
auto.offset.reset.config=earliest
application.name=theodolite-uc1-application
application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
......@@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
auto.offset.reset.config=earliest
......@@ -3,5 +3,6 @@ plugins {
}
dependencies {
implementation project(':uc1-commons')
implementation project(':uc1-commons')
implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
}
package application;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} for a generic {@link DatabaseAdapter}.
*
* @param <T> Type parameter of {@link DatabaseAdapter}.
*/
public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
private static final long serialVersionUID = 1L;
private final DatabaseAdapter<T> databaseAdapter;
private final Class<T> type;
/**
* Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the
* corresponding {@link Class} object for Beam.
*/
public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) {
super();
this.databaseAdapter = databaseAdapter;
this.type = type;
}
@Override
public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords
.apply(MapElements
.via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type)))
.apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
}
}
package application;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store
* {@link ActivePowerRecord}s, optionally, using a {@link Configuration}.
*/
public interface SinkFactory {
PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration);
}
package application;
import application.firestore.FirestoreSink;
import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Supported Sink types, i.e., {@link PTransform} for converting and storing
* {@link ActivePowerRecord}s.
*/
public enum SinkType implements SinkFactory {
LOGGER("logger") {
@Override
public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
final Configuration config) {
return new GenericSink<>(LogWriterFactory.forJson(), String.class);
}
},
FIRESTORE("firestore") {
@Override
public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
final Configuration config) {
return FirestoreSink.fromConfig(config);
}
};
private final String value;
SinkType(final String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
/**
* Create a new {@link SinkType} from its string representation.
*/
public static SinkType from(final String value) {
return Stream.of(SinkType.values())
.filter(t -> t.value.equals(value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist."));
}
}
package application;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
* generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
* Implementation of benchmark UC1: Database Storage with Apache Beam.
*/
public final class Uc1BeamPipeline extends AbstractPipeline {
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
public static final String SINK_TYPE_KEY = "sink.type";
protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config);
// Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY));
// build KafkaConsumerConfig
final Map<String, Object> consumerConfig = this.buildConsumerConfig();
// Set Coders for classes that will be distributed
final CoderRegistry cr = super.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// Create Pipeline transformations
final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(
super.bootstrapServer,
super.inputTopic,
super.buildConsumerConfig());
// Apply pipeline transformations
// Read from Kafka
this.apply(kafka)
super.apply(kafka)
.apply(Values.create())
.apply(MapElements.via(new ConverterAdapter<>(
this.databaseAdapter.getRecordConverter(),
String.class)))
.apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
.apply(sinkType.create(config));
}
}
package application.firestore;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Value;
import java.io.IOException;
import org.apache.beam.sdk.transforms.SimpleFunction;
import titan.ccp.model.records.ActivePowerRecord;
final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
private static final long serialVersionUID = -5263671231838353749L; // NOPMD
private transient FirestoreConfig firestoreConfig;
private final String collection;
public DocumentMapper(final String collection) {
super();
this.collection = collection;
}
@Override
public Document apply(final ActivePowerRecord record) {
return Document
.newBuilder()
.setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp()))
.putFields("identifier",
Value.newBuilder().setStringValue(record.getIdentifier()).build())
.putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build())
.putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build())
.build();
}
private String createDocumentName(final String documentId) {
this.initFirestoreConfig();
return "projects/" + this.firestoreConfig.getProjectId()
+ "/databases/" + this.firestoreConfig.getDatabaseDdlRequest()
+ "/documents/" + this.collection
+ "/" + documentId;
}
private void initFirestoreConfig() {
if (this.firestoreConfig == null) {
try {
this.firestoreConfig = FirestoreConfig.createFromDefaults();
} catch (final IOException e) {
throw new IllegalStateException("Cannot create Firestore configuration.", e);
}
}
}
}
package application.firestore;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.firestore.FirestoreOptions;
import java.io.IOException;
final class FirestoreConfig {
private final FirestoreOptions firestoreOptions;
private FirestoreConfig(final FirestoreOptions firestoreOptions) {
this.firestoreOptions = firestoreOptions;
}
public String getProjectId() {
return this.firestoreOptions.getProjectId();
}
public String getDatabaseDdlRequest() {
return this.firestoreOptions.getProjectId();
}
public static FirestoreConfig createFromDefaults() throws IOException {
return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder()
.setCredentials(GoogleCredentials.getApplicationDefault())
.build());
}
}
package application.firestore;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.firestore.v1.Document;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing
* these {@link DocumentSnapshot} to Firestore.
*/
public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection";
private static final long serialVersionUID = 1L;
private final String collectionName;
public FirestoreSink(final String collectionName) {
super();
this.collectionName = collectionName;
}
@Override
public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords
.apply(MapElements.via(new DocumentMapper(this.collectionName)))
.apply(MapElements.via(new UpdateOperationMapper()))
.apply(FirestoreIO.v1().write().batchWrite().build());
}
public static FirestoreSink fromConfig(final Configuration config) {
final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY);
return new FirestoreSink(collectionName);
}
}
package application.firestore;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Write;
import org.apache.beam.sdk.transforms.SimpleFunction;
final class UpdateOperationMapper extends SimpleFunction<Document, Write> {
private static final long serialVersionUID = -5263671231838353748L; // NOPMD
@Override
public Write apply(final Document document) {
return Write.newBuilder()
.setUpdate(document)
.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment