Class KinesisIO

java.lang.Object
org.apache.beam.sdk.io.aws2.kinesis.KinesisIO

public final class KinesisIO extends Object
IO to read from Kinesis streams.

Reading from Kinesis

Example usages:


 p.apply(KinesisIO.read()
     .withStreamName("streamName")
     .withInitialPositionInStream(InitialPositionInStream.LATEST)
  .apply( ... ) // other transformations
 

At a minimum you have to provide:

  • the name of the stream to read
  • the position in the stream where to start reading, e.g. InitialPositionInStream.LATEST, InitialPositionInStream.TRIM_HORIZON, or alternatively, using an arbitrary point in time with KinesisIO.Read.withInitialTimestampInStream(Instant).

Watermarks

Kinesis IO uses arrival time for watermarks by default. To use processing time instead, use KinesisIO.Read.withProcessingTimeWatermarkPolicy():


 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withProcessingTimeWatermarkPolicy())
 

It is also possible to specify a custom watermark policy to control watermark computation using KinesisIO.Read.withCustomWatermarkPolicy(WatermarkPolicyFactory). This requires implementing WatermarkPolicy with a corresponding WatermarkPolicyFactory.

Throttling

By default Kinesis IO will poll the Kinesis getRecords() API as fast as possible as long as records are returned. The RateLimitPolicyFactory.DefaultRateLimiter will start throttling once getRecords() returns an empty response or if API calls get throttled by AWS.

A RateLimitPolicy is always applied to each shard individually.

You may provide a custom rate limit policy using KinesisIO.Read.withCustomRateLimitPolicy(RateLimitPolicyFactory). This requires implementing RateLimitPolicy with a corresponding RateLimitPolicyFactory.

Enhanced Fan-Out

Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of consumer doesn't have to contend with other consumers that are receiving data from the stream.

More details can be found here: Consumers with Dedicated Throughput

Primary method of enabling EFO is setting KinesisIO.Read.withConsumerArn(String):


 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
 

Alternatively, EFO can be enabled for one or more KinesisIO.Read instances via pipeline options:

--kinesisIOConsumerArns{
   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
   ...
 }

If set, pipeline options will overwrite KinesisIO.Read.withConsumerArn(String) setting. Check KinesisIOOptions for more details.

Depending on the downstream processing performance, the EFO consumer will back-pressure internally.

Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s) faster than once per ~ 10 seconds. Internal calls to KinesisAsyncClient.subscribeToShard(SubscribeToShardRequest, SubscribeToShardResponseHandler) may throw ResourceInUseException otherwise, which will cause a crash loop.

EFO source, when consuming from a stream with often re-sharding, may eventually get skewed load among runner workers: some may end up with no active shard subscriptions at all.

Enhanced Fan-Out and KinesisIO state management

Different runners may behave differently when a Beam application is started from a persisted state. Examples of persisted state are:

Depending on their internals, runners may persist entire KinesisIO.Read object inside the state, like Flink runner does. It means that, once enabled via KinesisIO.Read.withConsumerArn(String) in Flink runner, as long as the Beam application starts from a savepoint, further changes to KinesisIO.Read.withConsumerArn(String) won't take effect.

If your runner persists KinesisIO.Read object, disabling / changing consumer ARN and restoring from persisted state can be done via KinesisIOOptions.setKinesisIOConsumerArns(Map):

--kinesisIOConsumerArns={
   "stream-01": " < new consumer ARN > ",  <- updated ARN
   "stream-02": null,  <- disabling EFO
   ...
 }

EFO can be enabled / disabled any time without loosing consumer's positions in shards which were already checkpoint-ed. Consumer ARN for a given stream can be changed any time, too.

Enhanced Fan-Out and other KinesisIO settings

When EFO is enabled, the following configurations are ignored:

Writing to Kinesis

Example usages:

PCollection<KV<String, byte[]>> data = ...;

 data.apply(KinesisIO.write()
     .withStreamName("streamName")
     .withPartitionKey(KV::getKey)
     .withSerializer(KV::getValue);
 

Note: Usage of KV is just for illustration purposes here.

At a minimum you have to provide:

  • the name of the Kinesis stream to write to,
  • a KinesisPartitioner to distribute records across shards of the stream
  • and a function to serialize your data to bytes on the stream
Though, generally, it's recommended to configure client retries using ClientConfiguration, see below.

Partitioning of writes

Choosing the right partitioning strategy by means of a KinesisPartitioner is one of the key considerations when writing to Kinesis. Typically, you should aime to evenly distribute data across all shards of the stream.

Partition keys are used as input to a hash function that maps the partition key and associated data to a specific shard. If the cardinality of your partition keys is of the same order of magnitude as the number of shards in the stream, the hash function will likely not distribute your keys evenly among shards. This may result in heavily skewed shards with some shards not utilized at all.

If you require finer control over the distribution of records, override KinesisPartitioner.getExplicitHashKey(Object) according to your needs. However, this might impact record aggregation.

Aggregation of records

To better leverage Kinesis API limits and to improve producer throughput, the writer aggregates multiple users records into an aggregated KPL record.

Records of the same effective hash key get aggregated. The effective hash key is:

  1. the explicit hash key, if provided.
  2. the lower bound of the hash key range of the target shard according to the given partition key, if available.
  3. or otherwise the hashed partition key

To provide shard aware aggregation in 2., hash key ranges of shards are loaded and refreshed periodically. This allows to aggregate records into a number of aggregates that matches the number of shards in the stream to max out Kinesis API limits the best possible way.

Note:There's an important downside to consider when using shard aware aggregation: records get assigned to a shard (via an explicit hash key) on the client side, but respective client side state can't be guaranteed to always be up-to-date. If a shard gets split, all aggregates are mapped to the lower child shard until state is refreshed. Timing, however, will diverge between the different workers.

If using an KinesisPartitioner.ExplicitPartitioner or disabling shard refresh via KinesisIO.RecordAggregation, no shard details will be loaded (and used).

Record aggregation can be entirely disabled using KinesisIO.Write.withRecordAggregationDisabled().

Configuration of AWS clients

AWS clients for all AWS IOs can be configured using AwsOptions, e.g. --awsRegion=us-west-1. AwsOptions contain reasonable defaults based on default providers for Region and AwsCredentialsProvider.

If you require more advanced configuration, you may change the ClientBuilderFactory using AwsOptions.setClientBuilderFactory(Class).

Configuration for a specific IO can be overwritten using withClientConfiguration(), which also allows to configure the retry behavior for the respective IO.

Retries

Retries for failed requests can be configured using ClientConfiguration.Builder.retry(Consumer) and are handled by the AWS SDK unless there's a partial success (batch requests). The SDK uses a backoff strategy with equal jitter for computing the delay before the next retry.

Note: Once retries are exhausted the error is surfaced to the runner which may then opt to retry the current partition in entirety or abort if the max number of retries of the runner is reached.

  • Constructor Details

    • KinesisIO

      public KinesisIO()
  • Method Details