diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index 03c5ca1daa7ffab71a4d08c04f677d7412e3a2be..3e94fb4c878401183f45ff384e39dd6bc0291a27 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; import titan.ccp.common.configuration.ServiceConfigurations; /** - * Abstraction of a Beam microservice. - * Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. + * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and + * the beam Runner. */ public class AbstractBeamService { @@ -20,26 +20,24 @@ public class AbstractBeamService { // Application Configurations private final Configuration config = ServiceConfigurations.createWithDefaults(); - private final String applicationName = - config.getString(ConfigurationKeys.APPLICATION_NAME); - + private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); /** * Creates AbstractBeamService with options. */ - public AbstractBeamService(final String[] args) { //NOPMD + public AbstractBeamService(final String[] args) { // NOPMD super(); LOGGER.info("Pipeline options:"); for (final String s : args) { LOGGER.info("{}", s); } - options = PipelineOptionsFactory.fromArgs(args).create(); - options.setJobName(applicationName); - LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); + this.options = PipelineOptionsFactory.fromArgs(args).create(); + this.options.setJobName(this.applicationName); + LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString()); } public Configuration getConfig() { - return config; + return this.config; } } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java index 83336b5a4c2451ef4bffefbd60ad9d52fccd9c17..e513c3a0e3dffcb9881f389af5ee9f05c52a2b63 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.Deserializer; /** * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. @@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; * @param <K> Type of the Key. * @param <V> Type of the Value. */ -public class KafkaGenericReader<K, V> extends - PTransform<PBegin, PCollection<KV<K, V>>> { +public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; private final PTransform<PBegin, PCollection<KV<K, V>>> reader; @@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - public KafkaGenericReader(final String bootstrapServer, final String inputTopic, - final Map<String, Object> consumerConfig, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<K>> - keyDeserializer, - final Class<? extends - org.apache.kafka.common.serialization.Deserializer<V>> - valueDeserializer) { + public KafkaGenericReader( + final String bootstrapServer, + final String inputTopic, + final Map<String, Object> consumerConfig, + final Class<? extends Deserializer<K>> keyDeserializer, + final Class<? extends Deserializer<V>> valueDeserializer) { super(); // Check if boostrap server and inputTopic are defined @@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - reader = + this.reader = KafkaIO.<K, V>read() .withBootstrapServers(bootstrapServer) .withTopic(inputTopic) diff --git a/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh b/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh index ebbecd1c5336c5dd907db11b8c8c45924e5924a8..7c7f11a94f42d56d91d383f27d58ad9a09a918e5 100755 --- a/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh +++ b/theodolite-benchmarks/docker-test/uc1-beam-flink/test.sh @@ -2,7 +2,7 @@ sleep 55s # to let the benchmark and produce some output docker-compose logs --tail 100 benchmark-taskmanager | - sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | + sed -n "s/^.*Record:\s\(\S*\)$/\1/p" | tee /dev/stderr | jq .identifier | sort | diff --git a/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh b/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh index ed17db3a44d5c4a8dacfbc956c2f36dd47503508..62327e860cb658741d0892052f5202df3f5b431e 100755 --- a/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh +++ b/theodolite-benchmarks/docker-test/uc1-beam-samza/test.sh @@ -2,7 +2,7 @@ sleep 55s # to let the benchmark and produce some output docker-compose logs --tail 100 benchmark | - sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | + sed -n "s/^.*Record:\s\(\S*\)$/\1/p" | tee /dev/stderr | jq .identifier | sort | diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties index 50db1510ab5d7f6b8c9b1a75f112719209c351ce..70cc5e94a64b8218344263d9d9d2ba3421fd69fd 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ application.name=theodolite-uc1-application application.version=0.0.1 +sink.type=logger + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output @@ -13,4 +15,4 @@ cache.max.bytes.buffering=-1 specific.avro.reader=True enable.auto.commit.config=True -auto.offset.reset.config=earliest \ No newline at end of file +auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index ae80dcd8c5b263892f4539098a9a29b25c133819..659eb09e67487132bca2b3ecb82298690dbf33c6 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -3,5 +3,6 @@ plugins { } dependencies { - implementation project(':uc1-commons') + implementation project(':uc1-commons') + implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java new file mode 100644 index 0000000000000000000000000000000000000000..04b47cd8c4c6a976fc602fa2fbf93dcaaa36680e --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/GenericSink.java @@ -0,0 +1,41 @@ +package application; + +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} for a generic {@link DatabaseAdapter}. + * + * @param <T> Type parameter of {@link DatabaseAdapter}. + */ +public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + private static final long serialVersionUID = 1L; + + private final DatabaseAdapter<T> databaseAdapter; + private final Class<T> type; + + /** + * Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the + * corresponding {@link Class} object for Beam. + */ + public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) { + super(); + this.databaseAdapter = databaseAdapter; + this.type = type; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements + .via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type))) + .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); + + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..91052827ff58f0bb52d289073c84e31cfc234c31 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkFactory.java @@ -0,0 +1,16 @@ +package application; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store + * {@link ActivePowerRecord}s, optionally, using a {@link Configuration}. + */ +public interface SinkFactory { + + PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration); + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java new file mode 100644 index 0000000000000000000000000000000000000000..82ca2573ef5108e2f2a9423400ca63be1760d449 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/SinkType.java @@ -0,0 +1,52 @@ +package application; + +import application.firestore.FirestoreSink; +import java.util.stream.Stream; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Supported Sink types, i.e., {@link PTransform} for converting and storing + * {@link ActivePowerRecord}s. + */ +public enum SinkType implements SinkFactory { + + LOGGER("logger") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return new GenericSink<>(LogWriterFactory.forJson(), String.class); + } + }, + FIRESTORE("firestore") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return FirestoreSink.fromConfig(config); + } + }; + + private final String value; + + SinkType(final String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + /** + * Create a new {@link SinkType} from its string representation. + */ + public static SinkType from(final String value) { + return Stream.of(SinkType.values()) + .filter(t -> t.value.equals(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist.")); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index dde2bc064af9b21e11b859fdbf3f6ba1374d17d7..352b32a29ff6cfd5d01a4e74798f79c8d08c769a 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,54 +1,39 @@ package application; -import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.commons.configuration2.Configuration; -import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; -import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import titan.ccp.model.records.ActivePowerRecord; - /** - * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To - * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress - * using--flinkMaster as run parameter. To persist logs add - * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard - * Input Output in Common in the Run Configuration Start via Eclipse Run. + * Implementation of benchmark UC1: Database Storage with Apache Beam. */ public final class Uc1BeamPipeline extends AbstractPipeline { - private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); + public static final String SINK_TYPE_KEY = "sink.type"; protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); - // Set Coders for Classes that will be distributed - final CoderRegistry cr = this.getCoderRegistry(); - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); + final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); - // build KafkaConsumerConfig - final Map<String, Object> consumerConfig = this.buildConsumerConfig(); + // Set Coders for classes that will be distributed + final CoderRegistry cr = super.getCoderRegistry(); + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - // Create Pipeline transformations - final KafkaActivePowerTimestampReader kafka = - new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( + super.bootstrapServer, + super.inputTopic, + super.buildConsumerConfig()); - // Apply pipeline transformations - // Read from Kafka - this.apply(kafka) + super.apply(kafka) .apply(Values.create()) - .apply(MapElements.via(new ConverterAdapter<>( - this.databaseAdapter.getRecordConverter(), - String.class))) - .apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))); + .apply(sinkType.create(config)); } + } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..ab4617ecd1a46e083c863d26c999a25ee5008836 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/DocumentMapper.java @@ -0,0 +1,52 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Value; +import java.io.IOException; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> { + + private static final long serialVersionUID = -5263671231838353749L; // NOPMD + + private transient FirestoreConfig firestoreConfig; + + private final String collection; + + public DocumentMapper(final String collection) { + super(); + this.collection = collection; + } + + @Override + public Document apply(final ActivePowerRecord record) { + return Document + .newBuilder() + .setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp())) + .putFields("identifier", + Value.newBuilder().setStringValue(record.getIdentifier()).build()) + .putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build()) + .putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build()) + .build(); + } + + private String createDocumentName(final String documentId) { + this.initFirestoreConfig(); + return "projects/" + this.firestoreConfig.getProjectId() + + "/databases/" + this.firestoreConfig.getDatabaseDdlRequest() + + "/documents/" + this.collection + + "/" + documentId; + } + + private void initFirestoreConfig() { + if (this.firestoreConfig == null) { + try { + this.firestoreConfig = FirestoreConfig.createFromDefaults(); + } catch (final IOException e) { + throw new IllegalStateException("Cannot create Firestore configuration.", e); + } + } + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..eb62d69f907cc27f3974f09942e5d75ba701a34f --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreConfig.java @@ -0,0 +1,29 @@ +package application.firestore; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.firestore.FirestoreOptions; +import java.io.IOException; + +final class FirestoreConfig { + + private final FirestoreOptions firestoreOptions; + + private FirestoreConfig(final FirestoreOptions firestoreOptions) { + this.firestoreOptions = firestoreOptions; + } + + public String getProjectId() { + return this.firestoreOptions.getProjectId(); + } + + public String getDatabaseDdlRequest() { + return this.firestoreOptions.getProjectId(); + } + + public static FirestoreConfig createFromDefaults() throws IOException { + return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder() + .setCredentials(GoogleCredentials.getApplicationDefault()) + .build()); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java new file mode 100644 index 0000000000000000000000000000000000000000..a1db24eeb9e05f3b8e198621f4a3e7107e095b13 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/FirestoreSink.java @@ -0,0 +1,41 @@ +package application.firestore; + +import com.google.cloud.firestore.DocumentSnapshot; +import com.google.firestore.v1.Document; +import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing + * these {@link DocumentSnapshot} to Firestore. + */ +public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection"; + + private static final long serialVersionUID = 1L; + + private final String collectionName; + + public FirestoreSink(final String collectionName) { + super(); + this.collectionName = collectionName; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + return activePowerRecords + .apply(MapElements.via(new DocumentMapper(this.collectionName))) + .apply(MapElements.via(new UpdateOperationMapper())) + .apply(FirestoreIO.v1().write().batchWrite().build()); + } + + public static FirestoreSink fromConfig(final Configuration config) { + final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY); + return new FirestoreSink(collectionName); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..d67bed2aedbd97bfe4271efa0514c8d4e594683e --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/firestore/UpdateOperationMapper.java @@ -0,0 +1,18 @@ +package application.firestore; + +import com.google.firestore.v1.Document; +import com.google.firestore.v1.Write; +import org.apache.beam.sdk.transforms.SimpleFunction; + +final class UpdateOperationMapper extends SimpleFunction<Document, Write> { + + private static final long serialVersionUID = -5263671231838353748L; // NOPMD + + @Override + public Write apply(final Document document) { + return Write.newBuilder() + .setUpdate(document) + .build(); + } + +} diff --git a/theodolite-benchmarks/uc1-commons/build.gradle b/theodolite-benchmarks/uc1-commons/build.gradle index cd95e28ed53cc8e33a416564e613574faf1fc6cb..0f7d31d1f557ecd214b3a57227851d0f70b61084 100644 --- a/theodolite-benchmarks/uc1-commons/build.gradle +++ b/theodolite-benchmarks/uc1-commons/build.gradle @@ -13,8 +13,10 @@ repositories { } dependencies { - implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } - implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + // Make this implementation once this is a local subproject. + // Currently, Flink needs its own version of these dependencies. + compileOnly('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } + compileOnly('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'com.google.code.gson:gson:2.8.9'