@Experimental(value=SOURCE_SINK) public final class KinesisIO extends java.lang.Object
PTransforms for reading from and writing to Kinesis streams.
Example usage:
p.apply(KinesisIO.read()
.withStreamName("streamName")
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION)
.apply( ... ) // other transformations
As you can see you need to provide 3 things:
InitialPositionInStream.LATEST - reading will begin from end of the stream
InitialPositionInStream.TRIM_HORIZON - reading will begin at the very
beginning of the stream
AmazonKinesis and AmazonCloudWatch clients:
In case when you want to set up AmazonKinesis or AmazonCloudWatch client by
your own (for example if you're using more sophisticated authorization methods like Amazon STS,
etc.) you can do it by implementing AWSClientsProvider class:
public class MyCustomKinesisClientProvider implements AWSClientsProvider {
{@literal @}Override
public AmazonKinesis getKinesisClient() {
// set up your client here
}
public AmazonCloudWatch getCloudWatchClient() {
// set up your client here
}
}
Usage is pretty straightforward:
p.apply(KinesisIO.read()
.withStreamName("streamName")
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(new MyCustomKinesisClientProvider())
.apply( ... ) // other transformations
There’s also possibility to start reading using arbitrary point in time - in this case you
need to provide Instant object:
p.apply(KinesisIO.read()
.withStreamName("streamName")
.withInitialTimestampInStream(instant)
.withAWSClientsProvider(new MyCustomKinesisClientProvider())
.apply( ... ) // other transformations
Example usage:
PCollection<byte[]> data = ...;
data.apply(KinesisIO.write()
.withStreamName("streamName")
.withPartitionKey("partitionKey")
.withAWSClientsProvider(AWS_KEY, AWS_SECRET, STREAM_REGION));
As a client, you need to provide at least 3 things:
KinesisPartitioner) that defines which
partition will be used for writing
AmazonKinesis and AmazonCloudWatch clients:
In case if you need to define more complicated logic for key partitioning then you can create
your own implementation of KinesisPartitioner and set it by KinesisIO.Write.withPartitioner(KinesisPartitioner)
Internally, KinesisIO.Write relies on Amazon Kinesis Producer Library (KPL). This
library can be configured with a set of Properties if needed.
Example usage of KPL configuration:
Properties properties = new Properties();
properties.setProperty("KinesisEndpoint", "localhost");
properties.setProperty("KinesisPort", "4567");
PCollection<byte[]> data = ...;
data.apply(KinesisIO.write()
.withStreamName("streamName")
.withPartitionKey("partitionKey")
.withAWSClientsProvider(AWS_KEY, AWS_SECRET, STREAM_REGION)
.withProducerProperties(properties));
For more information about configuratiom parameters, see the sample of configuration file.
| Modifier and Type | Class and Description |
|---|---|
static class |
KinesisIO.Read
Implementation of
read(). |
static class |
KinesisIO.Write
Implementation of
write(). |
| Constructor and Description |
|---|
KinesisIO() |
| Modifier and Type | Method and Description |
|---|---|
static KinesisIO.Read |
read()
Returns a new
KinesisIO.Read transform for reading from Kinesis. |
static KinesisIO.Write |
write()
A
PTransform writing data to Kinesis. |
public static KinesisIO.Read read()
KinesisIO.Read transform for reading from Kinesis.public static KinesisIO.Write write()
PTransform writing data to Kinesis.