Class RedisIO

java.lang.Object
org.apache.beam.sdk.io.redis.RedisIO

public class RedisIO extends Object
An IO to manipulate Redis key/value database.

Reading Redis key/value pairs

read() provides a source which returns a bounded PCollection containing key/value pairs as KV<String, String>.

To configure a Redis source, you have to provide Redis server hostname and port number. Optionally, you can provide a key pattern (to filter the keys). The following example illustrates how to configure a source:


 pipeline.apply(RedisIO.read()
   .withEndpoint("::1", 6379)
   .withKeyPattern("foo*"))

 

It's also possible to specify Redis authentication and connection timeout with the corresponding methods:


 pipeline.apply(RedisIO.read()
   .withEndpoint("::1", 6379)
   .withAuth("authPassword")
   .withTimeout(60000)
   .withKeyPattern("foo*"))

 

readKeyPatterns() can be used to request Redis server using input PCollection elements as key pattern (as String).


 pipeline.apply(...)
    // here we have a PCollection<String> with the key patterns
    .apply(RedisIO.readKeyPatterns().withEndpoint("::1", 6379))
   // here we have a PCollection<KV<String,String>>

 

Writing Redis key/value pairs

write() provides a sink to write key/value pairs represented as KV from an incoming PCollection.

To configure the target Redis server, you have to provide Redis server hostname and port number. The following example illustrates how to configure a sink:


 pipeline.apply(...)
   // here we a have a PCollection<String, String> with key/value pairs
   .apply(RedisIO.write().withEndpoint("::1", 6379))

 

Writing Redis Streams

writeStreams() appends the entries of a PCollection of key/value pairs represented as KV to the Redis stream at the specified key using the XADD API.

To configure the target Redis server, you have to provide a Redis server hostname and port number. The following example illustrates how to configure a sink:


 pipeline.apply(...)
   // here we have a PCollection<KV<String, Map<String, String>>>
   .apply(RedisIO.writeStreams().withEndpoint("::1", 6379))
 

Redis Streams optionally can be capped to a specific or exact length (see the documentation for the XTRIM API); writeStreams() lets you specify MAXLEN using the withMaxLen() option.

Trimming a stream to an exact length is noted in the Redis documentation as being inefficient; the withApproximateTrim() boolean option will add the ~ prefix to MAXLEN, which tells Redis to use "almost exact" trimming.

See the Redis Streams documentation for a deeper discussion of the issues involved.

The following example illustrates how to configure a sink with trimming:


 pipeline.apply(...)
   // here we have a PCollection<KV<String, Map<String, String>>>
   .apply(RedisIO.writeStreams()
     .withEndpoint("::1", 6379)
     .withMaxLen(1024L)
     .withApproximateTrim(true)
    )
 
  • Method Details

    • read

      public static RedisIO.Read read()
      Read data from a Redis server.
    • readKeyPatterns

      public static RedisIO.ReadKeyPatterns readKeyPatterns()
      Like read() but executes multiple instances of the Redis query substituting each element of a PCollection as key pattern.
    • write

      public static RedisIO.Write write()
      Write data to a Redis server.
    • writeStreams

      public static RedisIO.WriteStreams writeStreams()
      Write stream data to a Redis server.