diff --git a/theodolite-benchmarks/uc1-beam/build.gradle b/theodolite-benchmarks/uc1-beam/build.gradle index 3e31782d858f2a7c45ca3fd5aac8af615a0a5ac7..c3444865fc2a698157a1f481c9b8a09da9e7fcd9 100644 --- a/theodolite-benchmarks/uc1-beam/build.gradle +++ b/theodolite-benchmarks/uc1-beam/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { implementation project(':uc1-commons') implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' + implementation 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.35.0' implementation ('com.spotify:scio-core_2.13:0.11.5') { transitive = false } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java index fbfa0e9e6cb54c7b65d0c10ae0b3576d2c9f118c..68c54f38e1881e07db72dd671b9daabd5df0ed33 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/SinkType.java @@ -4,6 +4,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.configuration2.Configuration; +import rocks.theodolite.benchmarks.uc1.beam.dynamodb.DynamoDbSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.custom.CustomFirestoreSink; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; @@ -35,6 +36,13 @@ public enum SinkType implements SinkFactory { final Configuration config) { return CustomFirestoreSink.fromConfig(config); } + }, + DYNAMO_DB("dynamodb") { + @Override + public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create( + final Configuration config) { + return DynamoDbSink.fromConfig(config); + } }; private final String value; diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java new file mode 100644 index 0000000000000000000000000000000000000000..62e8911b90e2e9a345a32b5b07c580bbcfa96a91 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/DynamoDbSink.java @@ -0,0 +1,84 @@ +package rocks.theodolite.benchmarks.uc1.beam.dynamodb; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.configuration2.Configuration; +import org.joda.time.Duration; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link PutRequest}s, followed by + * storing these them to AWS DynamoDB. + */ +public class DynamoDbSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> { + + public static final String SINK_DYNAMODB_TABLE_KEY = "sink.dynamodb.table"; + + private static final String SINK_DYNAMODB_REGION_KEY = "sink.dynamodb.region"; + + private static final String SINK_AWS_ACCESS_KEY_KEY = "sink.dynamodb.aws.access.key"; + + private static final String SINK_AWS_SECRET_KEY_KEY = "sink.dynamodb.aws.secret.key"; + + private static final int MAX_RETRIES = 5; + + private static final long serialVersionUID = 1L; + + // private static final Logger LOGGER = LoggerFactory.getLogger(FirestoreSink.class); + + private final String tableName; + private final String awsAccessKey; + private final String awsSecretKey; + private final Regions regions; + + + /** + * Create a new {@link DynamoDbSink} based on the provided table name, AWS credentials and + * provided region.. + */ + public DynamoDbSink(final String tableName, final String awsAccessKey, final String awsSecretKey, + final Regions regions) { + super(); + this.tableName = tableName; + this.awsAccessKey = awsAccessKey; + this.awsSecretKey = awsSecretKey; + this.regions = regions; + } + + @Override + public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) { + + + return activePowerRecords + .apply(MapElements.via(new PutRequestMapper())) + .apply(DynamoDBIO.<WriteRequest>write() + .withWriteRequestMapperFn( + (SerializableFunction<WriteRequest, KV<String, WriteRequest>>) r -> KV + .of(this.tableName, r)) + .withRetryConfiguration( + DynamoDBIO.RetryConfiguration.create(MAX_RETRIES, Duration.standardMinutes(1))) + .withAwsClientsProvider(this.awsAccessKey, this.awsSecretKey, this.regions)); + } + + /** + * Create a {@link DynamoDbSink} from the provided {@link Configuration} object. + */ + public static DynamoDbSink fromConfig(final Configuration config) { + final String tableName = config.getString(SINK_DYNAMODB_TABLE_KEY); + final String awsAccessKey = config.getString(SINK_AWS_ACCESS_KEY_KEY); + final String awsSecretKey = config.getString(SINK_AWS_SECRET_KEY_KEY); + final Regions regions = Regions.fromName(config.getString(SINK_DYNAMODB_REGION_KEY)); + return new DynamoDbSink( + tableName, + awsAccessKey, + awsSecretKey, + regions); + } +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..e16571d077e4c9e0e0046feb09592b4d9ba8ba47 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/dynamodb/PutRequestMapper.java @@ -0,0 +1,27 @@ +package rocks.theodolite.benchmarks.uc1.beam.dynamodb; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import java.util.Map; +import org.apache.beam.sdk.transforms.SimpleFunction; +import titan.ccp.model.records.ActivePowerRecord; + +final class PutRequestMapper extends SimpleFunction<ActivePowerRecord, WriteRequest> { + + private static final long serialVersionUID = -5263671231838343749L; // NOPMD + + public PutRequestMapper() { + super(); + } + + @Override + public WriteRequest apply(final ActivePowerRecord record) { + final Map<String, AttributeValue> recordMap = Map.of( + "identifier", new AttributeValue().withS(record.getIdentifier()), + "timestamp", new AttributeValue().withN(Long.toString(record.getTimestamp())), + "valueInW", new AttributeValue().withN(Double.toString(record.getValueInW()))); + return new WriteRequest().withPutRequest(new PutRequest(recordMap)); + } + +}