This module implements IO classes to read and write data on MongoDB.
Read from MongoDB¶
ReadFromMongoDB is a
PTransform that reads from a configured
MongoDB source and returns a
PCollection of dict representing MongoDB
To configure MongoDB source, the URI to connect to MongoDB server, database
name, collection name needs to be provided.
pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017', db='testdb', coll='input')
To read from MongoDB Atlas, use
bucket_auto option to enable
@bucketAuto MongoDB aggregation instead of
command which is a high-privilege function that cannot be assigned
to any user in Atlas.
pipeline | ReadFromMongoDB(uri='mongodb+srv://user:email@example.com', db='testdb', coll='input', bucket_auto=True)
Write to MongoDB:¶
WriteToMongoDB is a
PTransform that writes MongoDB documents to
configured sink, and the write is conducted through a mongodb bulk_write of
ReplaceOne operations. If the document’s _id field already existed in the
MongoDB collection, it results in an overwrite, otherwise, a new document
will be inserted.
pipeline | WriteToMongoDB(uri='mongodb://localhost:27017', db='testdb', coll='output', batch_size=10)
No backward compatibility guarantees. Everything in this module is experimental.
ReadFromMongoDB(uri='mongodb://localhost:27017', db=None, coll=None, filter=None, projection=None, extra_client_params=None, bucket_auto=False)[source]¶
PTransformto read MongoDB documents into a
- uri (str) – The MongoDB connection string following the URI format.
- db (str) – The MongoDB database name.
- coll (str) – The MongoDB collection name.
- filter – A bson.SON object specifying elements which must be present for a document to be included in the result set.
- projection – A list of field names that should be returned in the result set or a dict specifying the fields to include or exclude.
- extra_client_params (dict) – Optional MongoClient parameters.
- bucket_auto (bool) – If
True, use MongoDB $bucketAuto aggregation to split collection into bundles instead of splitVector command, which does not work with MongoDB Atlas. If
False(the default), use splitVector command for bundling.
WriteToMongoDB(uri='mongodb://localhost:27017', db=None, coll=None, batch_size=100, extra_client_params=None)[source]¶
WriteToMongoDB is a
PTransformthat writes a
PCollectionof mongodb document to the configured MongoDB server.
In order to make the document writes idempotent so that the bundles are retry-able without creating duplicates, the PTransform added 2 transformations before final write stage: a
GenerateIdtransform and a
----------------------------------------------- Pipeline --> |GenerateId --> Reshuffle --> WriteToMongoSink| ----------------------------------------------- (WriteToMongoDB)
GenerateIdtransform adds a random and unique*_id* field to the documents if they don’t already have one, it uses the same format as MongoDB default. The
Reshuffletransform makes sure that no fusion happens between
GenerateIdand the final write stage transform,so that the set of documents and their unique IDs are not regenerated if final write step is retried due to a failure. This prevents duplicate writes of the same document with different unique IDs.
- uri (str) – The MongoDB connection string following the URI format
- db (str) – The MongoDB database name
- coll (str) – The MongoDB collection name
- batch_size (int) – Number of documents per bulk_write to MongoDB, default to 100
- extra_client_params (dict) –
Optional MongoClient parameters as keyword arguments