Class RedisIO
Reading Redis key/value pairs
read()
provides a source which returns a bounded PCollection
containing
key/value pairs as KV<String, String>
.
To configure a Redis source, you have to provide Redis server hostname and port number. Optionally, you can provide a key pattern (to filter the keys). The following example illustrates how to configure a source:
pipeline.apply(RedisIO.read()
.withEndpoint("::1", 6379)
.withKeyPattern("foo*"))
It's also possible to specify Redis authentication and connection timeout with the corresponding methods:
pipeline.apply(RedisIO.read()
.withEndpoint("::1", 6379)
.withAuth("authPassword")
.withTimeout(60000)
.withKeyPattern("foo*"))
readKeyPatterns()
can be used to request Redis server using input PCollection
elements as key pattern (as String).
pipeline.apply(...)
// here we have a PCollection<String> with the key patterns
.apply(RedisIO.readKeyPatterns().withEndpoint("::1", 6379))
// here we have a PCollection<KV<String,String>>
Writing Redis key/value pairs
write()
provides a sink to write key/value pairs represented as KV
from an
incoming PCollection
.
To configure the target Redis server, you have to provide Redis server hostname and port number. The following example illustrates how to configure a sink:
pipeline.apply(...)
// here we a have a PCollection<String, String> with key/value pairs
.apply(RedisIO.write().withEndpoint("::1", 6379))
Writing Redis Streams
writeStreams()
appends the entries of a PCollection
of key/value pairs
represented as KV
to the Redis stream at the specified key using the XADD API.
To configure the target Redis server, you have to provide a Redis server hostname and port number. The following example illustrates how to configure a sink:
pipeline.apply(...)
// here we have a PCollection<KV<String, Map<String, String>>>
.apply(RedisIO.writeStreams().withEndpoint("::1", 6379))
Redis Streams optionally can be capped to a specific or exact length (see the documentation
for the XTRIM API); writeStreams()
lets
you specify MAXLEN using the withMaxLen()
option.
Trimming a stream to an exact length is noted in the Redis documentation as being inefficient;
the withApproximateTrim()
boolean option will add the ~
prefix to MAXLEN
,
which tells Redis to use "almost exact" trimming.
See the Redis Streams documentation for a deeper discussion of the issues involved.
The following example illustrates how to configure a sink with trimming:
pipeline.apply(...)
// here we have a PCollection<KV<String, Map<String, String>>>
.apply(RedisIO.writeStreams()
.withEndpoint("::1", 6379)
.withMaxLen(1024L)
.withApproximateTrim(true)
)
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
Implementation ofread()
.static class
Implementation ofreadKeyPatterns()
.static class
APTransform
to write to a Redis server.static class
APTransform
to write stream key pairs (https://redis.io/topics/streams-intro) to a Redis server. -
Method Summary
Modifier and TypeMethodDescriptionstatic RedisIO.Read
read()
Read data from a Redis server.static RedisIO.ReadKeyPatterns
Likeread()
but executes multiple instances of the Redis query substituting each element of aPCollection
as key pattern.static RedisIO.Write
write()
Write data to a Redis server.static RedisIO.WriteStreams
Write stream data to a Redis server.
-
Method Details
-
read
Read data from a Redis server. -
readKeyPatterns
Likeread()
but executes multiple instances of the Redis query substituting each element of aPCollection
as key pattern. -
write
Write data to a Redis server. -
writeStreams
Write stream data to a Redis server.
-