Class MongoDbGridFSIO

java.lang.Object
org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO

public class MongoDbGridFSIO extends Object
IO to read and write data on MongoDB GridFS.

Reading from MongoDB via GridFS

MongoDbGridFSIO source returns a bounded collection of Objects as PCollection<T>.

To configure the MongoDB GridFS source, you can provide the connection URI, the database name and the bucket name. If unspecified, the default values from the GridFS driver are used.

The following example illustrates various options for configuring the source:


 pipeline.apply(MongoDbGridFSIO.<String>read()
   .withUri("mongodb://localhost:27017")
   .withDatabase("my-database")
   .withBucket("my-bucket"))
 

The source also accepts an optional configuration: withQueryFilter() allows you to define a JSON filter to get subset of files in the database.

There is also an optional Parser (and associated Coder) that can be specified that can be used to parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp. When using a parser that outputs with custom timestamps, you may also need to specify the allowedTimestampSkew option.

Writing to MongoDB via GridFS

MongoDBGridFS supports writing of data to a file in a MongoDB GridFS collection.

To configure a MongoDB GridFS sink, you can provide the connection URI, the database name and the bucket name. You must also provide the filename to write to. Another optional parameter is the GridFS file chunkSize.

For instance:


 pipeline
   .apply(...)
   .apply(MongoDbGridFSIO.write()
     .withUri("mongodb://localhost:27017")
     .withDatabase("my-database")
     .withBucket("my-bucket")
     .withChunkSize(256000L)
     .withFilename("my-output.txt"))

 

There is also an optional argument to the create() method to specify a writer that is used to write the data to the OutputStream. By default, it writes UTF-8 strings to the file separated with line feeds.