Class DynamoDBIO

java.lang.Object
org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO

public final class DynamoDBIO extends Object
IO to read from and write to DynamoDB tables.

Reading from DynamoDB

Example usage:


 PCollection<List<Map<String, AttributeValue>>> output =
   pipeline.apply(
     DynamoDBIO.<List<Map<String, AttributeValue>>>read()
       .withScanRequestFn(in -> ScanRequest.builder().tableName(tableName).totalSegments(1).build())
       .items()); // ScanResponse items mapper
 

At a minimum you have to provide:

  • a scanRequestFn providing the ScanRequest instance; table name and total segments are required. Note: Choose total segments according to the number of workers used.
  • a scanResponseMapperFn to map the ScanResponse to the expected output type, such as DynamoDBIO.Read.items().

Writing to DynamoDB

Example usage:


 PCollection<T> data = ...;
 SerializableFunction<T, WriteRequest> requestBuilder = ...;
 data.apply(
   DynamoDBIO.<WriteRequest>write()
     .withWriteRequestMapperFn(t -> KV.of(tableName, requestBuilder.apply(t))));
 

At a minimum you have to provide a writeRequestMapperFn to map each element into a KV of table name and WriteRequest.

Note: AWS does not allow writing duplicate keys within a single batch operation. If primary keys possibly repeat in your stream (i.e. an upsert stream), you may encounter a `ValidationError`. To address this you have to provide the key names corresponding to your primary key using DynamoDBIO.Write.withDeduplicateKeys(List). Based on these keys only the last observed element is kept. Nevertheless, if no deduplication keys are provided, identical elements are still deduplicated.

Configuration of AWS clients

AWS clients for all AWS IOs can be configured using AwsOptions, e.g. --awsRegion=us-west-1. AwsOptions contain reasonable defaults based on default providers for Region and AwsCredentialsProvider.

If you require more advanced configuration, you may change the ClientBuilderFactory using AwsOptions.setClientBuilderFactory(Class).

Configuration for a specific IO can be overwritten using withClientConfiguration(), which also allows to configure the retry behavior for the respective IO.

Retries

Retries for failed requests can be configured using ClientConfiguration.Builder.retry(Consumer) and are handled by the AWS SDK unless there's a partial success (batch requests). The SDK uses a backoff strategy with equal jitter for computing the delay before the next retry.

Note: Once retries are exhausted the error is surfaced to the runner which may then opt to retry the current partition in entirety or abort if the max number of retries of the runner is reached.

  • Constructor Details

    • DynamoDBIO

      public DynamoDBIO()
  • Method Details