#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module implements IO classes to read and write data on MongoDB.
Read from MongoDB
-----------------
:class:`ReadFromMongoDB` is a ``PTransform`` that reads from a configured
MongoDB source and returns a ``PCollection`` of dict representing MongoDB
documents.
To configure MongoDB source, the URI to connect to MongoDB server, database
name, collection name needs to be provided.
Example usage::
pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='input')
To read from MongoDB Atlas, use ``bucket_auto`` option to enable
``@bucketAuto`` MongoDB aggregation instead of ``splitVector``
command which is a high-privilege function that cannot be assigned
to any user in Atlas.
Example usage::
pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
db='testdb',
coll='input',
bucket_auto=True)
Write to MongoDB:
-----------------
:class:`WriteToMongoDB` is a ``PTransform`` that writes MongoDB documents to
configured sink, and the write is conducted through a mongodb bulk_write of
``ReplaceOne`` operations. If the document's _id field already existed in the
MongoDB collection, it results in an overwrite, otherwise, a new document
will be inserted.
Example usage::
pipeline | WriteToMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='output',
batch_size=10)
No backward compatibility guarantees. Everything in this module is experimental.
"""
# pytype: skip-file
import itertools
import json
import logging
import math
import struct
from typing import Union
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import LexicographicKeyRangeTracker
from apache_beam.io.range_trackers import OffsetRangeTracker
from apache_beam.io.range_trackers import OrderedPositionRangeTracker
from apache_beam.transforms import DoFn
from apache_beam.transforms import PTransform
from apache_beam.transforms import Reshuffle
from apache_beam.utils.annotations import experimental
_LOGGER = logging.getLogger(__name__)
try:
# Mongodb has its own bundled bson, which is not compatible with bson package.
# (https://github.com/py-bson/bson/issues/82). Try to import objectid and if
# it fails because bson package is installed, MongoDB IO will not work but at
# least rest of the SDK will work.
from bson import json_util
from bson import objectid
from bson.objectid import ObjectId
# pymongo also internally depends on bson.
from pymongo import ASCENDING
from pymongo import DESCENDING
from pymongo import MongoClient
from pymongo import ReplaceOne
except ImportError:
objectid = None
json_util = None
ObjectId = None
ASCENDING = 1
DESCENDING = -1
MongoClient = None
ReplaceOne = None
_LOGGER.warning("Could not find a compatible bson package.")
__all__ = ["ReadFromMongoDB", "WriteToMongoDB"]
[docs]@experimental()
class ReadFromMongoDB(PTransform):
"""A ``PTransform`` to read MongoDB documents into a ``PCollection``."""
def __init__(
self,
uri="mongodb://localhost:27017",
db=None,
coll=None,
filter=None,
projection=None,
extra_client_params=None,
bucket_auto=False,
):
"""Initialize a :class:`ReadFromMongoDB`
Args:
uri (str): The MongoDB connection string following the URI format.
db (str): The MongoDB database name.
coll (str): The MongoDB collection name.
filter: A `bson.SON
<https://api.mongodb.com/python/current/api/bson/son.html>`_ object
specifying elements which must be present for a document to be included
in the result set.
projection: A list of field names that should be returned in the result
set or a dict specifying the fields to include or exclude.
extra_client_params(dict): Optional `MongoClient
<https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
parameters.
bucket_auto (bool): If :data:`True`, use MongoDB `$bucketAuto` aggregation
to split collection into bundles instead of `splitVector` command,
which does not work with MongoDB Atlas.
If :data:`False` (the default), use `splitVector` command for bundling.
Returns:
:class:`~apache_beam.transforms.ptransform.PTransform`
"""
if extra_client_params is None:
extra_client_params = {}
if not isinstance(db, str):
raise ValueError("ReadFromMongDB db param must be specified as a string")
if not isinstance(coll, str):
raise ValueError(
"ReadFromMongDB coll param must be specified as a string")
self._mongo_source = _BoundedMongoSource(
uri=uri,
db=db,
coll=coll,
filter=filter,
projection=projection,
extra_client_params=extra_client_params,
bucket_auto=bucket_auto,
)
[docs] def expand(self, pcoll):
return pcoll | iobase.Read(self._mongo_source)
class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
"""RangeTracker for tracking mongodb _id of bson ObjectId type."""
def position_to_fraction(
self,
pos: ObjectId,
start: ObjectId,
end: ObjectId,
):
"""Returns the fraction of keys in the range [start, end) that
are less than the given key.
"""
pos_number = _ObjectIdHelper.id_to_int(pos)
start_number = _ObjectIdHelper.id_to_int(start)
end_number = _ObjectIdHelper.id_to_int(end)
return (pos_number - start_number) / (end_number - start_number)
def fraction_to_position(
self,
fraction: float,
start: ObjectId,
end: ObjectId,
):
"""Converts a fraction between 0 and 1
to a position between start and end.
"""
start_number = _ObjectIdHelper.id_to_int(start)
end_number = _ObjectIdHelper.id_to_int(end)
total = end_number - start_number
pos = int(total * fraction + start_number)
# make sure split position is larger than start position and smaller than
# end position.
if pos <= start_number:
return _ObjectIdHelper.increment_id(start, 1)
if pos >= end_number:
return _ObjectIdHelper.increment_id(end, -1)
return _ObjectIdHelper.int_to_id(pos)
class _BoundedMongoSource(iobase.BoundedSource):
"""A MongoDB source that reads a finite amount of input records.
This class defines following operations which can be used to read
MongoDB source efficiently.
* Size estimation - method ``estimate_size()`` may return an accurate
estimation in bytes for the size of the source.
* Splitting into bundles of a given size - method ``split()`` can be used to
split the source into a set of sub-sources (bundles) based on a desired
bundle size.
* Getting a RangeTracker - method ``get_range_tracker()`` should return a
``RangeTracker`` object for a given position range for the position type
of the records returned by the source.
* Reading the data - method ``read()`` can be used to read data from the
source while respecting the boundaries defined by a given
``RangeTracker``.
A runner will perform reading the source in two steps.
(1) Method ``get_range_tracker()`` will be invoked with start and end
positions to obtain a ``RangeTracker`` for the range of positions the
runner intends to read. Source must define a default initial start and end
position range. These positions must be used if the start and/or end
positions passed to the method ``get_range_tracker()`` are ``None``
(2) Method read() will be invoked with the ``RangeTracker`` obtained in the
previous step.
**Mutability**
A ``_BoundedMongoSource`` object should not be mutated while
its methods (for example, ``read()``) are being invoked by a runner. Runner
implementations may invoke methods of ``_BoundedMongoSource`` objects through
multi-threaded and/or reentrant execution modes.
"""
def __init__(
self,
uri=None,
db=None,
coll=None,
filter=None,
projection=None,
extra_client_params=None,
bucket_auto=False,
):
if extra_client_params is None:
extra_client_params = {}
if filter is None:
filter = {}
self.uri = uri
self.db = db
self.coll = coll
self.filter = filter
self.projection = projection
self.spec = extra_client_params
self.bucket_auto = bucket_auto
def estimate_size(self):
with MongoClient(self.uri, **self.spec) as client:
return client[self.db].command("collstats", self.coll).get("size")
def _estimate_average_document_size(self):
with MongoClient(self.uri, **self.spec) as client:
return client[self.db].command("collstats", self.coll).get("avgObjSize")
def split(
self,
desired_bundle_size: int,
start_position: Union[int, str, bytes, ObjectId] = None,
stop_position: Union[int, str, bytes, ObjectId] = None,
):
"""Splits the source into a set of bundles.
Bundles should be approximately of size ``desired_bundle_size`` bytes.
Args:
desired_bundle_size: the desired size (in bytes) of the bundles returned.
start_position: if specified the given position must be used as the
starting position of the first bundle.
stop_position: if specified the given position must be used as the ending
position of the last bundle.
Returns:
an iterator of objects of type 'SourceBundle' that gives information about
the generated bundles.
"""
desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024
# for desired bundle size, if desired chunk size smaller than 1mb, use
# MongoDB default split size of 1mb.
if desired_bundle_size_in_mb < 1:
desired_bundle_size_in_mb = 1
is_initial_split = start_position is None and stop_position is None
start_position, stop_position = self._replace_none_positions(
start_position, stop_position
)
if self.bucket_auto:
# Use $bucketAuto for bundling
split_keys = []
weights = []
for bucket in self._get_auto_buckets(
desired_bundle_size_in_mb,
start_position,
stop_position,
is_initial_split,
):
split_keys.append({"_id": bucket["_id"]["max"]})
weights.append(bucket["count"])
else:
# Use splitVector for bundling
split_keys = self._get_split_keys(
desired_bundle_size_in_mb, start_position, stop_position)
weights = itertools.cycle((desired_bundle_size_in_mb, ))
bundle_start = start_position
for split_key_id, weight in zip(split_keys, weights):
if bundle_start >= stop_position:
break
bundle_end = min(stop_position, split_key_id["_id"])
yield iobase.SourceBundle(
weight=weight,
source=self,
start_position=bundle_start,
stop_position=bundle_end,
)
bundle_start = bundle_end
# add range of last split_key to stop_position
if bundle_start < stop_position:
# bucket_auto mode can come here if not split due to single document
weight = 1 if self.bucket_auto else desired_bundle_size_in_mb
yield iobase.SourceBundle(
weight=weight,
source=self,
start_position=bundle_start,
stop_position=stop_position,
)
def get_range_tracker(
self,
start_position: Union[int, str, ObjectId] = None,
stop_position: Union[int, str, ObjectId] = None,
) -> Union[
_ObjectIdRangeTracker, OffsetRangeTracker, LexicographicKeyRangeTracker]:
"""Returns a RangeTracker for a given position range depending on type.
Args:
start_position: starting position of the range. If 'None' default start
position of the source must be used.
stop_position: ending position of the range. If 'None' default stop
position of the source must be used.
Returns:
a ``_ObjectIdRangeTracker``, ``OffsetRangeTracker``
or ``LexicographicKeyRangeTracker`` depending on the given position range.
"""
start_position, stop_position = self._replace_none_positions(
start_position, stop_position
)
if isinstance(start_position, ObjectId):
return _ObjectIdRangeTracker(start_position, stop_position)
if isinstance(start_position, int):
return OffsetRangeTracker(start_position, stop_position)
if isinstance(start_position, str):
return LexicographicKeyRangeTracker(start_position, stop_position)
raise NotImplementedError(
f"RangeTracker for {type(start_position)} not implemented!")
def read(self, range_tracker):
"""Returns an iterator that reads data from the source.
The returned set of data must respect the boundaries defined by the given
``RangeTracker`` object. For example:
* Returned set of data must be for the range
``[range_tracker.start_position, range_tracker.stop_position)``. Note
that a source may decide to return records that start after
``range_tracker.stop_position``. See documentation in class
``RangeTracker`` for more details. Also, note that framework might
invoke ``range_tracker.try_split()`` to perform dynamic split
operations. range_tracker.stop_position may be updated
dynamically due to successful dynamic split operations.
* Method ``range_tracker.try_split()`` must be invoked for every record
that starts at a split point.
* Method ``range_tracker.record_current_position()`` may be invoked for
records that do not start at split points.
Args:
range_tracker: a ``RangeTracker`` whose boundaries must be respected
when reading data from the source. A runner that reads this
source muss pass a ``RangeTracker`` object that is not
``None``.
Returns:
an iterator of data read by the source.
"""
with MongoClient(self.uri, **self.spec) as client:
all_filters = self._merge_id_filter(
range_tracker.start_position(), range_tracker.stop_position())
docs_cursor = (
client[self.db][self.coll].find(
filter=all_filters,
projection=self.projection).sort([("_id", ASCENDING)]))
for doc in docs_cursor:
if not range_tracker.try_claim(doc["_id"]):
return
yield doc
def display_data(self):
"""Returns the display data associated to a pipeline component."""
res = super().display_data()
res["database"] = self.db
res["collection"] = self.coll
res["filter"] = json.dumps(self.filter, default=json_util.default)
res["projection"] = str(self.projection)
res["bucket_auto"] = self.bucket_auto
return res
@staticmethod
def _range_is_not_splittable(
start_pos: Union[int, str, ObjectId],
end_pos: Union[int, str, ObjectId],
):
"""Return `True` if splitting range doesn't make sense
(single document is not splittable),
Return `False` otherwise.
"""
return ((
isinstance(start_pos, ObjectId) and
start_pos >= _ObjectIdHelper.increment_id(end_pos, -1)) or
(isinstance(start_pos, int) and start_pos >= end_pos - 1) or
(isinstance(start_pos, str) and start_pos >= end_pos))
def _get_split_keys(
self,
desired_chunk_size_in_mb: int,
start_pos: Union[int, str, ObjectId],
end_pos: Union[int, str, ObjectId],
):
"""Calls MongoDB `splitVector` command
to get document ids at split position.
"""
# single document not splittable
if self._range_is_not_splittable(start_pos, end_pos):
return []
with MongoClient(self.uri, **self.spec) as client:
name_space = "%s.%s" % (self.db, self.coll)
return client[self.db].command(
"splitVector",
name_space,
keyPattern={"_id": 1}, # Ascending index
min={"_id": start_pos},
max={"_id": end_pos},
maxChunkSize=desired_chunk_size_in_mb,
)["splitKeys"]
def _get_auto_buckets(
self,
desired_chunk_size_in_mb: int,
start_pos: Union[int, str, ObjectId],
end_pos: Union[int, str, ObjectId],
is_initial_split: bool,
) -> list:
"""Use MongoDB `$bucketAuto` aggregation to split collection into bundles
instead of `splitVector` command, which does not work with MongoDB Atlas.
"""
# single document not splittable
if self._range_is_not_splittable(start_pos, end_pos):
return []
if is_initial_split and not self.filter:
# total collection size in MB
size_in_mb = self.estimate_size() / float(1 << 20)
else:
# size of documents within start/end id range and possibly filtered
documents_count = self._count_id_range(start_pos, end_pos)
avg_document_size = self._estimate_average_document_size()
size_in_mb = documents_count * avg_document_size / float(1 << 20)
if size_in_mb == 0:
# no documents not splittable (maybe a result of filtering)
return []
bucket_count = math.ceil(size_in_mb / desired_chunk_size_in_mb)
with beam.io.mongodbio.MongoClient(self.uri, **self.spec) as client:
pipeline = [
{
# filter by positions and by the custom filter if any
"$match": self._merge_id_filter(start_pos, end_pos)
},
{
"$bucketAuto": {
"groupBy": "$_id", "buckets": bucket_count
}
},
]
buckets = list(
# Use `allowDiskUse` option to avoid aggregation limit of 100 Mb RAM
client[self.db][self.coll].aggregate(pipeline, allowDiskUse=True))
if buckets:
buckets[-1]["_id"]["max"] = end_pos
return buckets
def _merge_id_filter(
self,
start_position: Union[int, str, bytes, ObjectId],
stop_position: Union[int, str, bytes, ObjectId] = None,
) -> dict:
"""Merge the default filter (if any) with refined _id field range
of range_tracker.
$gte specifies start position (inclusive)
and $lt specifies the end position (exclusive),
see more at
https://docs.mongodb.com/manual/reference/operator/query/gte/ and
https://docs.mongodb.com/manual/reference/operator/query/lt/
"""
if stop_position is None:
id_filter = {"_id": {"$gte": start_position}}
else:
id_filter = {"_id": {"$gte": start_position, "$lt": stop_position}}
if self.filter:
all_filters = {
# see more at
# https://docs.mongodb.com/manual/reference/operator/query/and/
"$and": [self.filter.copy(), id_filter]
}
else:
all_filters = id_filter
return all_filters
def _get_head_document_id(self, sort_order):
with MongoClient(self.uri, **self.spec) as client:
cursor = (
client[self.db][self.coll].find(filter={}, projection=[]).sort([
("_id", sort_order)
]).limit(1))
try:
return cursor[0]["_id"]
except IndexError:
raise ValueError("Empty Mongodb collection")
def _replace_none_positions(self, start_position, stop_position):
if start_position is None:
start_position = self._get_head_document_id(ASCENDING)
if stop_position is None:
last_doc_id = self._get_head_document_id(DESCENDING)
# increment last doc id binary value by 1 to make sure the last document
# is not excluded
if isinstance(last_doc_id, ObjectId):
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
elif isinstance(last_doc_id, int):
stop_position = last_doc_id + 1
elif isinstance(last_doc_id, str):
stop_position = last_doc_id + '\x00'
return start_position, stop_position
def _count_id_range(self, start_position, stop_position):
"""Number of documents between start_position (inclusive)
and stop_position (exclusive), respecting the custom filter if any.
"""
with MongoClient(self.uri, **self.spec) as client:
return client[self.db][self.coll].count_documents(
filter=self._merge_id_filter(start_position, stop_position))
class _ObjectIdHelper:
"""A Utility class to manipulate bson object ids."""
@classmethod
def id_to_int(cls, _id: Union[int, ObjectId]) -> int:
"""
Args:
_id: ObjectId required for each MongoDB document _id field.
Returns: Converted integer value of ObjectId's 12 bytes binary value.
"""
if isinstance(_id, int):
return _id
# converts object id binary to integer
# id object is bytes type with size of 12
ints = struct.unpack(">III", _id.binary)
return (ints[0] << 64) + (ints[1] << 32) + ints[2]
@classmethod
def int_to_id(cls, number):
"""
Args:
number(int): The integer value to be used to convert to ObjectId.
Returns: The ObjectId that has the 12 bytes binary converted from the
integer value.
"""
# converts integer value to object id. Int value should be less than
# (2 ^ 96) so it can be convert to 12 bytes required by object id.
if number < 0 or number >= (1 << 96):
raise ValueError("number value must be within [0, %s)" % (1 << 96))
ints = [
(number & 0xFFFFFFFF0000000000000000) >> 64,
(number & 0x00000000FFFFFFFF00000000) >> 32,
number & 0x0000000000000000FFFFFFFF,
]
number_bytes = struct.pack(">III", *ints)
return ObjectId(number_bytes)
@classmethod
def increment_id(
cls,
_id: ObjectId,
inc: int,
) -> ObjectId:
"""
Increment object_id binary value by inc value and return new object id.
Args:
_id: The `_id` to change.
inc(int): The incremental int value to be added to `_id`.
Returns:
`_id` incremented by `inc` value
"""
id_number = _ObjectIdHelper.id_to_int(_id)
new_number = id_number + inc
if new_number < 0 or new_number >= (1 << 96):
raise ValueError(
"invalid incremental, inc value must be within ["
"%s, %s)" % (0 - id_number, 1 << 96 - id_number))
return _ObjectIdHelper.int_to_id(new_number)
[docs]@experimental()
class WriteToMongoDB(PTransform):
"""WriteToMongoDB is a ``PTransform`` that writes a ``PCollection`` of
mongodb document to the configured MongoDB server.
In order to make the document writes idempotent so that the bundles are
retry-able without creating duplicates, the PTransform added 2 transformations
before final write stage:
a ``GenerateId`` transform and a ``Reshuffle`` transform.::
-----------------------------------------------
Pipeline --> |GenerateId --> Reshuffle --> WriteToMongoSink|
-----------------------------------------------
(WriteToMongoDB)
The ``GenerateId`` transform adds a random and unique*_id* field to the
documents if they don't already have one, it uses the same format as MongoDB
default. The ``Reshuffle`` transform makes sure that no fusion happens between
``GenerateId`` and the final write stage transform,so that the set of
documents and their unique IDs are not regenerated if final write step is
retried due to a failure. This prevents duplicate writes of the same document
with different unique IDs.
"""
def __init__(
self,
uri="mongodb://localhost:27017",
db=None,
coll=None,
batch_size=100,
extra_client_params=None,
):
"""
Args:
uri (str): The MongoDB connection string following the URI format
db (str): The MongoDB database name
coll (str): The MongoDB collection name
batch_size(int): Number of documents per bulk_write to MongoDB,
default to 100
extra_client_params(dict): Optional `MongoClient
<https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
parameters as keyword arguments
Returns:
:class:`~apache_beam.transforms.ptransform.PTransform`
"""
if extra_client_params is None:
extra_client_params = {}
if not isinstance(db, str):
raise ValueError("WriteToMongoDB db param must be specified as a string")
if not isinstance(coll, str):
raise ValueError(
"WriteToMongoDB coll param must be specified as a string")
self._uri = uri
self._db = db
self._coll = coll
self._batch_size = batch_size
self._spec = extra_client_params
[docs] def expand(self, pcoll):
return (
pcoll
| beam.ParDo(_GenerateObjectIdFn())
| Reshuffle()
| beam.ParDo(
_WriteMongoFn(
self._uri, self._db, self._coll, self._batch_size, self._spec)))
class _GenerateObjectIdFn(DoFn):
def process(self, element, *args, **kwargs):
# if _id field already exist we keep it as it is, otherwise the ptransform
# generates a new _id field to achieve idempotent write to mongodb.
if "_id" not in element:
# object.ObjectId() generates a unique identifier that follows mongodb
# default format, if _id is not present in document, mongodb server
# generates it with this same function upon write. However the
# uniqueness of generated id may not be guaranteed if the work load are
# distributed across too many processes. See more on the ObjectId format
# https://docs.mongodb.com/manual/reference/bson-types/#objectid.
element["_id"] = objectid.ObjectId()
yield element
class _WriteMongoFn(DoFn):
def __init__(
self, uri=None, db=None, coll=None, batch_size=100, extra_params=None):
if extra_params is None:
extra_params = {}
self.uri = uri
self.db = db
self.coll = coll
self.spec = extra_params
self.batch_size = batch_size
self.batch = []
def finish_bundle(self):
self._flush()
def process(self, element, *args, **kwargs):
self.batch.append(element)
if len(self.batch) >= self.batch_size:
self._flush()
def _flush(self):
if len(self.batch) == 0:
return
with _MongoSink(self.uri, self.db, self.coll, self.spec) as sink:
sink.write(self.batch)
self.batch = []
def display_data(self):
res = super(_WriteMongoFn, self).display_data()
res["database"] = self.db
res["collection"] = self.coll
res["batch_size"] = self.batch_size
return res
class _MongoSink:
def __init__(self, uri=None, db=None, coll=None, extra_params=None):
if extra_params is None:
extra_params = {}
self.uri = uri
self.db = db
self.coll = coll
self.spec = extra_params
self.client = None
def write(self, documents):
if self.client is None:
self.client = MongoClient(host=self.uri, **self.spec)
requests = []
for doc in documents:
# match document based on _id field, if not found in current collection,
# insert new one, otherwise overwrite it.
requests.append(
ReplaceOne(
filter={"_id": doc.get("_id", None)},
replacement=doc,
upsert=True))
resp = self.client[self.db][self.coll].bulk_write(requests)
_LOGGER.debug(
"BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
"nMatched:%d, Errors:%s" % (
resp.modified_count,
resp.upserted_count,
resp.matched_count,
resp.bulk_api_result.get("writeErrors"),
))
def __enter__(self):
if self.client is None:
self.client = MongoClient(host=self.uri, **self.spec)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.client is not None:
self.client.close()