Class SqsIO
Reading from SQS
SqsIO.Read
returns an unbounded PCollection
of SqsMessage
s. As minimum
configuration you have to provide the queue url
to connect to using SqsIO.Read.withQueueUrl(String)
.
Example usage:
PCollection<SqsMessage> output =
pipeline.apply(SqsIO.read().withQueueUrl(queueUrl))
Note: Currently this source does not advance watermarks when no new messages are received.
Writing to SQS
SqsIO.Write
takes a PCollection
of SendMessageRequest
s as input. Each
request must contain the queue url
. No further configuration is required.
Example usage:
PCollection<SendMessageRequest> data = ...;
data.apply(SqsIO.write())
Configuration of AWS clients
AWS clients for all AWS IOs can be configured using AwsOptions
, e.g.
--awsRegion=us-west-1
. AwsOptions
contain reasonable defaults based on default providers
for Region
and AwsCredentialsProvider
.
If you require more advanced configuration, you may change the ClientBuilderFactory
using AwsOptions.setClientBuilderFactory(Class)
.
Configuration for a specific IO can be overwritten using withClientConfiguration()
,
which also allows to configure the retry behavior for the respective IO.
Retries
Retries for failed requests can be configured using ClientConfiguration.Builder.retry(Consumer)
and are handled by the AWS SDK unless there's a
partial success (batch requests). The SDK uses a backoff strategy with equal jitter for computing
the delay before the next retry.
Note: Once retries are exhausted the error is surfaced to the runner which may then opt to retry the current partition in entirety or abort if the max number of retries of the runner is reached.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
APTransform
to read/receive messages from SQS.static class
Deprecated.static class
APTransform
to send messages to SQS. -
Method Summary
Modifier and TypeMethodDescriptionstatic SqsIO.Read
read()
static SqsIO.Write
write()
Deprecated.UsewriteBatches()
for more configuration options.static <T> SqsIO.WriteBatches
<T>
-
Method Details
-
read
-
write
Deprecated.UsewriteBatches()
for more configuration options. -
writeBatches
-
SqsIO.WriteBatches