Class RedisIO
Reading Redis key/value pairs
read() provides a source which returns a bounded PCollection containing
key/value pairs as KV<String, String>.
To configure a Redis source, you have to provide Redis server hostname and port number. Optionally, you can provide a key pattern (to filter the keys). The following example illustrates how to configure a source:
pipeline.apply(RedisIO.read()
.withEndpoint("::1", 6379)
.withKeyPattern("foo*"))
It's also possible to specify Redis authentication and connection timeout with the corresponding methods:
pipeline.apply(RedisIO.read()
.withEndpoint("::1", 6379)
.withAuth("authPassword")
.withTimeout(60000)
.withKeyPattern("foo*"))
readKeyPatterns() can be used to request Redis server using input PCollection
elements as key pattern (as String).
pipeline.apply(...)
// here we have a PCollection<String> with the key patterns
.apply(RedisIO.readKeyPatterns().withEndpoint("::1", 6379))
// here we have a PCollection<KV<String,String>>
Writing Redis key/value pairs
write() provides a sink to write key/value pairs represented as KV from an
incoming PCollection.
To configure the target Redis server, you have to provide Redis server hostname and port number. The following example illustrates how to configure a sink:
pipeline.apply(...)
// here we a have a PCollection<String, String> with key/value pairs
.apply(RedisIO.write().withEndpoint("::1", 6379))
Writing Redis Streams
writeStreams() appends the entries of a PCollection of key/value pairs
represented as KV to the Redis stream at the specified key using the XADD API.
To configure the target Redis server, you have to provide a Redis server hostname and port number. The following example illustrates how to configure a sink:
pipeline.apply(...)
// here we have a PCollection<KV<String, Map<String, String>>>
.apply(RedisIO.writeStreams().withEndpoint("::1", 6379))
Redis Streams optionally can be capped to a specific or exact length (see the documentation
for the XTRIM API); writeStreams() lets
you specify MAXLEN using the withMaxLen() option.
Trimming a stream to an exact length is noted in the Redis documentation as being inefficient;
the withApproximateTrim() boolean option will add the ~ prefix to MAXLEN,
which tells Redis to use "almost exact" trimming.
See the Redis Streams documentation for a deeper discussion of the issues involved.
The following example illustrates how to configure a sink with trimming:
pipeline.apply(...)
// here we have a PCollection<KV<String, Map<String, String>>>
.apply(RedisIO.writeStreams()
.withEndpoint("::1", 6379)
.withMaxLen(1024L)
.withApproximateTrim(true)
)
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classImplementation ofread().static classImplementation ofreadKeyPatterns().static classAPTransformto write to a Redis server.static classAPTransformto write stream key pairs (https://redis.io/topics/streams-intro) to a Redis server. -
Method Summary
Modifier and TypeMethodDescriptionstatic RedisIO.Readread()Read data from a Redis server.static RedisIO.ReadKeyPatternsLikeread()but executes multiple instances of the Redis query substituting each element of aPCollectionas key pattern.static RedisIO.Writewrite()Write data to a Redis server.static RedisIO.WriteStreamsWrite stream data to a Redis server.
-
Method Details
-
read
Read data from a Redis server. -
readKeyPatterns
Likeread()but executes multiple instances of the Redis query substituting each element of aPCollectionas key pattern. -
write
Write data to a Redis server. -
writeStreams
Write stream data to a Redis server.
-