From 6957a8db5aac24e47a582febc9ca8aea7f7d5b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Wed, 6 Apr 2022 23:47:32 +0200 Subject: [PATCH] Add DynamoDB sink --- theodolite-benchmarks/uc1-beam/build.gradle | 1 + .../benchmarks/uc1/beam/SinkType.java | 8 ++ .../uc1/beam/dynamodb/DynamoDbSink.java | 84 +++++++++++++++++++ .../uc1/beam/dynamodb/PutRequestMapper.java | 27 ++++++ 4 files changed, 120 insertions(+) create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java create mode 100644 theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index 3e31782d8..c3444865f 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { implementation project(':uc1-commons') implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' + implementation 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.35.0' implementation ('com.spotify:scio-core_2.13:0.11.5') { transitive = false } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java index fbfa0e9e6..68c54f38e 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java @@ -4,6 +4,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.beam.dynamodb.DynamoDbSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.custom.CustomFirestoreSink; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; @@ -35,6 +36,13 @@ public enum SinkType implements SinkFactory { final Configuration config) { return CustomFirestoreSink.fromConfig(config); } + }, + DYNAMO_DB("dynamodb") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return DynamoDbSink.fromConfig(config); + } }; private final String value; diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java new file mode 100644 index 000000000..62e8911b9 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java @@ -0,0 +1,84 @@ +package rocks.theodolite.benchmarks.uc1.beam.dynamodb; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import org.joda.time.Duration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link PutRequest}s, followed by + * storing these them to AWS DynamoDB. + */ +public class DynamoDbSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + public static final String SINK_DYNAMODB_TABLE_KEY = "sink.dynamodb.table"; + + private static final String SINK_DYNAMODB_REGION_KEY = "sink.dynamodb.region"; + + private static final String SINK_AWS_ACCESS_KEY_KEY = "sink.dynamodb.aws.access.key"; + + private static final String SINK_AWS_SECRET_KEY_KEY = "sink.dynamodb.aws.secret.key"; + + private static final int MAX_RETRIES = 5; + + private static final long serialVersionUID = 1L; + + // private static final Logger LOGGER = LoggerFactory.getLogger(FirestoreSink.class); + + private final String tableName; + private final String awsAccessKey; + private final String awsSecretKey; + private final Regions regions; + + + /** + * Create a new {@link DynamoDbSink} based on the provided table name, AWS credentials and + * provided region.. + */ + public DynamoDbSink(final String tableName, final String awsAccessKey, final String awsSecretKey, + final Regions regions) { + super(); + this.tableName = tableName; + this.awsAccessKey = awsAccessKey; + this.awsSecretKey = awsSecretKey; + this.regions = regions; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + + + return activePowerRecords + .apply(MapElements.via(new PutRequestMapper())) + .apply(DynamoDBIO.<WriteRequest>write() + .withWriteRequestMapperFn( + (SerializableFunction<WriteRequest, KV<String, WriteRequest>>) r -> KV + .of(this.tableName, r)) + .withRetryConfiguration( + DynamoDBIO.RetryConfiguration.create(MAX_RETRIES, Duration.standardMinutes(1))) + .withAwsClientsProvider(this.awsAccessKey, this.awsSecretKey, this.regions)); + } + + /** + * Create a {@link DynamoDbSink} from the provided {@link Configuration} object. + */ + public static DynamoDbSink fromConfig(final Configuration config) { + final String tableName = config.getString(SINK_DYNAMODB_TABLE_KEY); + final String awsAccessKey = config.getString(SINK_AWS_ACCESS_KEY_KEY); + final String awsSecretKey = config.getString(SINK_AWS_SECRET_KEY_KEY); + final Regions regions = Regions.fromName(config.getString(SINK_DYNAMODB_REGION_KEY)); + return new DynamoDbSink( + tableName, + awsAccessKey, + awsSecretKey, + regions); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java new file mode 100644 index 000000000..e16571d07 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java @@ -0,0 +1,27 @@ +package rocks.theodolite.benchmarks.uc1.beam.dynamodb; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import java.util.Map; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +final class PutRequestMapper extends SimpleFunction<ActivePowerRecord, WriteRequest> { + + private static final long serialVersionUID = -5263671231838343749L; // NOPMD + + public PutRequestMapper() { + super(); + } + + @Override + public WriteRequest apply(final ActivePowerRecord record) { + final Map<String, AttributeValue> recordMap = Map.of( + "identifier", new AttributeValue().withS(record.getIdentifier()), + "timestamp", new AttributeValue().withN(Long.toString(record.getTimestamp())), + "valueInW", new AttributeValue().withN(Double.toString(record.getValueInW()))); + return new WriteRequest().withPutRequest(new PutRequest(recordMap)); + } + +} -- GitLab