Skip to content
Snippets Groups Projects
Commit 6957a8db authored by Sören Henning's avatar Sören Henning
Browse files

Add DynamoDB sink

parent b4df7706
No related branches found
No related tags found
No related merge requests found
Pipeline #7808 passed
...@@ -5,6 +5,7 @@ plugins { ...@@ -5,6 +5,7 @@ plugins {
dependencies { dependencies {
implementation project(':uc1-commons') implementation project(':uc1-commons')
implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0' implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
implementation 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.35.0'
implementation ('com.spotify:scio-core_2.13:0.11.5') { implementation ('com.spotify:scio-core_2.13:0.11.5') {
transitive = false transitive = false
} }
......
...@@ -4,6 +4,7 @@ import java.util.stream.Stream; ...@@ -4,6 +4,7 @@ import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.beam.dynamodb.DynamoDbSink;
import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreSink;
import rocks.theodolite.benchmarks.uc1.beam.firestore.custom.CustomFirestoreSink; import rocks.theodolite.benchmarks.uc1.beam.firestore.custom.CustomFirestoreSink;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
...@@ -35,6 +36,13 @@ public enum SinkType implements SinkFactory { ...@@ -35,6 +36,13 @@ public enum SinkType implements SinkFactory {
final Configuration config) { final Configuration config) {
return CustomFirestoreSink.fromConfig(config); return CustomFirestoreSink.fromConfig(config);
} }
},
DYNAMO_DB("dynamodb") {
@Override
public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
final Configuration config) {
return DynamoDbSink.fromConfig(config);
}
}; };
private final String value; private final String value;
......
package rocks.theodolite.benchmarks.uc1.beam.dynamodb;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import org.joda.time.Duration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link PutRequest}s, followed by
* storing these them to AWS DynamoDB.
*/
public class DynamoDbSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
public static final String SINK_DYNAMODB_TABLE_KEY = "sink.dynamodb.table";
private static final String SINK_DYNAMODB_REGION_KEY = "sink.dynamodb.region";
private static final String SINK_AWS_ACCESS_KEY_KEY = "sink.dynamodb.aws.access.key";
private static final String SINK_AWS_SECRET_KEY_KEY = "sink.dynamodb.aws.secret.key";
private static final int MAX_RETRIES = 5;
private static final long serialVersionUID = 1L;
// private static final Logger LOGGER = LoggerFactory.getLogger(FirestoreSink.class);
private final String tableName;
private final String awsAccessKey;
private final String awsSecretKey;
private final Regions regions;
/**
* Create a new {@link DynamoDbSink} based on the provided table name, AWS credentials and
* provided region..
*/
public DynamoDbSink(final String tableName, final String awsAccessKey, final String awsSecretKey,
final Regions regions) {
super();
this.tableName = tableName;
this.awsAccessKey = awsAccessKey;
this.awsSecretKey = awsSecretKey;
this.regions = regions;
}
@Override
public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords
.apply(MapElements.via(new PutRequestMapper()))
.apply(DynamoDBIO.<WriteRequest>write()
.withWriteRequestMapperFn(
(SerializableFunction<WriteRequest, KV<String, WriteRequest>>) r -> KV
.of(this.tableName, r))
.withRetryConfiguration(
DynamoDBIO.RetryConfiguration.create(MAX_RETRIES, Duration.standardMinutes(1)))
.withAwsClientsProvider(this.awsAccessKey, this.awsSecretKey, this.regions));
}
/**
* Create a {@link DynamoDbSink} from the provided {@link Configuration} object.
*/
public static DynamoDbSink fromConfig(final Configuration config) {
final String tableName = config.getString(SINK_DYNAMODB_TABLE_KEY);
final String awsAccessKey = config.getString(SINK_AWS_ACCESS_KEY_KEY);
final String awsSecretKey = config.getString(SINK_AWS_SECRET_KEY_KEY);
final Regions regions = Regions.fromName(config.getString(SINK_DYNAMODB_REGION_KEY));
return new DynamoDbSink(
tableName,
awsAccessKey,
awsSecretKey,
regions);
}
}
package rocks.theodolite.benchmarks.uc1.beam.dynamodb;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import java.util.Map;
import org.apache.beam.sdk.transforms.SimpleFunction;
import titan.ccp.model.records.ActivePowerRecord;
final class PutRequestMapper extends SimpleFunction<ActivePowerRecord, WriteRequest> {
private static final long serialVersionUID = -5263671231838343749L; // NOPMD
public PutRequestMapper() {
super();
}
@Override
public WriteRequest apply(final ActivePowerRecord record) {
final Map<String, AttributeValue> recordMap = Map.of(
"identifier", new AttributeValue().withS(record.getIdentifier()),
"timestamp", new AttributeValue().withN(Long.toString(record.getTimestamp())),
"valueInW", new AttributeValue().withN(Double.toString(record.getValueInW())));
return new WriteRequest().withPutRequest(new PutRequest(recordMap));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment