@Experimental(value=SOURCE_SINK) public final class KinesisIO extends java.lang.Object
PTransforms for reading from and writing to Kinesis streams.
 Example usages:
 p.apply(KinesisIO.read()
     .withStreamName("streamName")
     .withInitialPositionInStream(InitialPositionInStream.LATEST)
     // using AWS default credentials provider chain (recommended)
     .withAWSClientsProvider(DefaultAWSCredentialsProviderChain.getInstance(), STREAM_REGION)
  .apply( ... ) // other transformations
 
 p.apply(KinesisIO.read()
     .withStreamName("streamName")
     .withInitialPositionInStream(InitialPositionInStream.LATEST)
     // using plain AWS key and secret
     .withAWSClientsProvider("AWS_KEY", "AWS_SECRET", STREAM_REGION)
  .apply( ... ) // other transformations
 As you can see you need to provide 3 things:
InitialPositionInStream.LATEST - reading will begin from end of the stream
         InitialPositionInStream.TRIM_HORIZON - reading will begin at the very
             beginning of the stream
       AmazonKinesis and AmazonCloudWatch clients:
       In case when you want to set up AmazonKinesis or AmazonCloudWatch client by
 your own (for example if you're using more sophisticated authorization methods like Amazon STS,
 etc.) you can do it by implementing AWSClientsProvider class:
 
 public class MyCustomKinesisClientProvider implements AWSClientsProvider {
   public AmazonKinesis getKinesisClient() {
     // set up your client here
   }
   public AmazonCloudWatch getCloudWatchClient() {
     // set up your client here
   }
 }
 Usage is pretty straightforward:
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withAWSClientsProvider(new MyCustomKinesisClientProvider())
  .apply( ... ) // other transformations
 There’s also possibility to start reading using arbitrary point in time - in this case you
 need to provide Instant object:
 
 p.apply(KinesisIO.read()
     .withStreamName("streamName")
     .withInitialTimestampInStream(instant)
     .withAWSClientsProvider(new MyCustomKinesisClientProvider())
  .apply( ... ) // other transformations
 Kinesis IO uses ArrivalTimeWatermarkPolicy by default. To use Processing time as event time:
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withProcessingTimeWatermarkPolicy())
 It is also possible to specify a custom watermark policy to control watermark computation. Below is an example
 // custom policy
 class MyCustomPolicy implements WatermarkPolicy {
     private WatermarkPolicyFactory.CustomWatermarkPolicy customWatermarkPolicy;
     MyCustomPolicy() {
       this.customWatermarkPolicy = new WatermarkPolicyFactory.CustomWatermarkPolicy(WatermarkParameters.create());
     }
     public Instant getWatermark() {
       return customWatermarkPolicy.getWatermark();
     }
     public void update(KinesisRecord record) {
       customWatermarkPolicy.update(record);
     }
   }
 // custom factory
 class MyCustomPolicyFactory implements WatermarkPolicyFactory {
     public WatermarkPolicy createWatermarkPolicy() {
       return new MyCustomPolicy();
     }
 }
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
 By default Kinesis IO will poll the Kinesis getRecords() API as fast as possible which may lead to excessive read throttling. To limit the rate of getRecords() calls you can set a rate limit policy. For example, the default fixed delay policy will limit the rate to one API call per second per shard:
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withFixedDelayRateLimitPolicy())
 You can also use a fixed delay policy with a specified delay interval, for example:
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withFixedDelayRateLimitPolicy(Duration.millis(500))
 If you need to change the polling interval of a Kinesis pipeline at runtime, for example to compensate for adding and removing additional consumers to the stream, then you can supply the delay interval as a function so that you can obtain the current delay interval from some external source:
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withDynamicDelayRateLimitPolicy(() -> Duration.millis(<some delay interval>))
 Finally, you can create a custom rate limit policy that responds to successful read calls and/or read throttling exceptions with your own rate-limiting logic:
 // custom policy
 public class MyCustomPolicy implements RateLimitPolicy {
   public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
     // handle successful getRecords() call
   }
   public void onThrottle(KinesisClientThrottledException e) throws InterruptedException {
     // handle Kinesis read throttling exception
   }
 }
 // custom factory
 class MyCustomPolicyFactory implements RateLimitPolicyFactory {
   public RateLimitPolicy getRateLimitPolicy() {
     return new MyCustomPolicy();
   }
 }
 p.apply(KinesisIO.read()
    .withStreamName("streamName")
    .withInitialPositionInStream(InitialPositionInStream.LATEST)
    .withCustomRateLimitPolicy(new MyCustomPolicyFactory())
 Example usages:
 PCollection<byte[]> data = ...;
 data.apply(KinesisIO.write()
     .withStreamName("streamName")
     .withPartitionKey("partitionKey")
     // using AWS default credentials provider chain (recommended)
     .withAWSClientsProvider(DefaultAWSCredentialsProviderChain.getInstance(), STREAM_REGION));
 
 PCollection<byte[]> data = ...;
 data.apply(KinesisIO.write()
     .withStreamName("streamName")
     .withPartitionKey("partitionKey")
      // using plain AWS key and secret
     .withAWSClientsProvider("AWS_KEY", "AWS_SECRET", STREAM_REGION));
 As a client, you need to provide at least 3 things:
KinesisPartitioner) that defines which
       partition will be used for writing
   AmazonKinesis and AmazonCloudWatch clients:
       In case if you need to define more complicated logic for key partitioning then you can create
 your own implementation of KinesisPartitioner and set it by KinesisIO.Write.withPartitioner(KinesisPartitioner)
 
Internally, KinesisIO.Write relies on Amazon Kinesis Producer Library (KPL). This
 library can be configured with a set of Properties if needed.
 
Example usage of KPL configuration:
 Properties properties = new Properties();
 properties.setProperty("KinesisEndpoint", "localhost");
 properties.setProperty("KinesisPort", "4567");
 PCollection<byte[]> data = ...;
 data.apply(KinesisIO.write()
     .withStreamName("streamName")
     .withPartitionKey("partitionKey")
     .withAWSClientsProvider(AWS_KEY, AWS_SECRET, STREAM_REGION)
     .withProducerProperties(properties));
 For more information about configuratiom parameters, see the sample of configuration file.
| Modifier and Type | Class and Description | 
|---|---|
| static class  | KinesisIO.Read<T>Implementation of  read(). | 
| static class  | KinesisIO.WriteImplementation of  write(). | 
| Constructor and Description | 
|---|
| KinesisIO() | 
| Modifier and Type | Method and Description | 
|---|---|
| static KinesisIO.Read<KinesisRecord> | read()Returns a new  KinesisIO.Readtransform for reading from Kinesis. | 
| static KinesisIO.Read<byte[]> | readData()A  PTransformto read from Kinesis stream as bytes without metadata and returns aPCollectionofbyte[]. | 
| static KinesisIO.Write | write()A  PTransformwriting data to Kinesis. | 
public static KinesisIO.Read<KinesisRecord> read()
KinesisIO.Read transform for reading from Kinesis.public static KinesisIO.Read<byte[]> readData()
PTransform to read from Kinesis stream as bytes without metadata and returns a PCollection of byte[].public static KinesisIO.Write write()
PTransform writing data to Kinesis.