#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module implements IO classes to read and write data on MongoDB.
Read from MongoDB
-----------------
:class:`ReadFromMongoDB` is a ``PTransform`` that reads from a configured
MongoDB source and returns a ``PCollection`` of dict representing MongoDB
documents.
To configure MongoDB source, the URI to connect to MongoDB server, database
name, collection name needs to be provided.
Example usage::
pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='input')
To read from MongoDB Atlas, use ``bucket_auto`` option to enable
``@bucketAuto`` MongoDB aggregation instead of ``splitVector``
command which is a high-privilege function that cannot be assigned
to any user in Atlas.
Example usage::
pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
db='testdb',
coll='input',
bucket_auto=True)
Write to MongoDB:
-----------------
:class:`WriteToMongoDB` is a ``PTransform`` that writes MongoDB documents to
configured sink, and the write is conducted through a mongodb bulk_write of
``ReplaceOne`` operations. If the document's _id field already existed in the
MongoDB collection, it results in an overwrite, otherwise, a new document
will be inserted.
Example usage::
pipeline | WriteToMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='output',
batch_size=10)
No backward compatibility guarantees. Everything in this module is experimental.
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
import itertools
import json
import logging
import math
import struct
import urllib
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OrderedPositionRangeTracker
from apache_beam.transforms import DoFn
from apache_beam.transforms import PTransform
from apache_beam.transforms import Reshuffle
from apache_beam.utils.annotations import experimental
_LOGGER = logging.getLogger(__name__)
try:
# Mongodb has its own bundled bson, which is not compatible with bson pakcage.
# (https://github.com/py-bson/bson/issues/82). Try to import objectid and if
# it fails because bson package is installed, MongoDB IO will not work but at
# least rest of the SDK will work.
from bson import objectid
# pymongo also internally depends on bson.
from pymongo import ASCENDING
from pymongo import DESCENDING
from pymongo import MongoClient
from pymongo import ReplaceOne
except ImportError:
objectid = None
_LOGGER.warning("Could not find a compatible bson package.")
__all__ = ['ReadFromMongoDB', 'WriteToMongoDB']
[docs]@experimental()
class ReadFromMongoDB(PTransform):
"""A ``PTransform`` to read MongoDB documents into a ``PCollection``.
"""
def __init__(
self,
uri='mongodb://localhost:27017',
db=None,
coll=None,
filter=None,
projection=None,
extra_client_params=None,
bucket_auto=False):
"""Initialize a :class:`ReadFromMongoDB`
Args:
uri (str): The MongoDB connection string following the URI format.
db (str): The MongoDB database name.
coll (str): The MongoDB collection name.
filter: A `bson.SON
<https://api.mongodb.com/python/current/api/bson/son.html>`_ object
specifying elements which must be present for a document to be included
in the result set.
projection: A list of field names that should be returned in the result
set or a dict specifying the fields to include or exclude.
extra_client_params(dict): Optional `MongoClient
<https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
parameters.
bucket_auto (bool): If :data:`True`, use MongoDB `$bucketAuto` aggregation
to split collection into bundles instead of `splitVector` command,
which does not work with MongoDB Atlas.
If :data:`False` (the default), use `splitVector` command for bundling.
Returns:
:class:`~apache_beam.transforms.ptransform.PTransform`
"""
if extra_client_params is None:
extra_client_params = {}
if not isinstance(db, str):
raise ValueError('ReadFromMongDB db param must be specified as a string')
if not isinstance(coll, str):
raise ValueError(
'ReadFromMongDB coll param must be specified as a '
'string')
self._mongo_source = _BoundedMongoSource(
uri=uri,
db=db,
coll=coll,
filter=filter,
projection=projection,
extra_client_params=extra_client_params,
bucket_auto=bucket_auto)
[docs] def expand(self, pcoll):
return pcoll | iobase.Read(self._mongo_source)
class _BoundedMongoSource(iobase.BoundedSource):
def __init__(
self,
uri=None,
db=None,
coll=None,
filter=None,
projection=None,
extra_client_params=None,
bucket_auto=False):
if extra_client_params is None:
extra_client_params = {}
if filter is None:
filter = {}
self.uri = uri
self.db = db
self.coll = coll
self.filter = filter
self.projection = projection
self.spec = extra_client_params
self.bucket_auto = bucket_auto
def estimate_size(self):
with MongoClient(self.uri, **self.spec) as client:
return client[self.db].command('collstats', self.coll).get('size')
def _estimate_average_document_size(self):
with MongoClient(self.uri, **self.spec) as client:
return client[self.db].command('collstats', self.coll).get('avgObjSize')
def split(self, desired_bundle_size, start_position=None, stop_position=None):
desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024
# for desired bundle size, if desired chunk size smaller than 1mb, use
# MongoDB default split size of 1mb.
if desired_bundle_size_in_mb < 1:
desired_bundle_size_in_mb = 1
is_initial_split = start_position is None and stop_position is None
start_position, stop_position = self._replace_none_positions(
start_position, stop_position)
if self.bucket_auto:
# Use $bucketAuto for bundling
split_keys = []
weights = []
for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
start_position,
stop_position,
is_initial_split):
split_keys.append({'_id': bucket['_id']['max']})
weights.append(bucket['count'])
else:
# Use splitVector for bundling
split_keys = self._get_split_keys(
desired_bundle_size_in_mb, start_position, stop_position)
weights = itertools.cycle((desired_bundle_size_in_mb, ))
bundle_start = start_position
for split_key_id, weight in zip(split_keys, weights):
if bundle_start >= stop_position:
break
bundle_end = min(stop_position, split_key_id['_id'])
yield iobase.SourceBundle(
weight=weight,
source=self,
start_position=bundle_start,
stop_position=bundle_end)
bundle_start = bundle_end
# add range of last split_key to stop_position
if bundle_start < stop_position:
# bucket_auto mode can come here if not split due to single document
weight = 1 if self.bucket_auto else desired_bundle_size_in_mb
yield iobase.SourceBundle(
weight=weight,
source=self,
start_position=bundle_start,
stop_position=stop_position)
def get_range_tracker(self, start_position, stop_position):
start_position, stop_position = self._replace_none_positions(
start_position, stop_position)
return _ObjectIdRangeTracker(start_position, stop_position)
def read(self, range_tracker):
with MongoClient(self.uri, **self.spec) as client:
all_filters = self._merge_id_filter(
range_tracker.start_position(), range_tracker.stop_position())
docs_cursor = client[self.db][self.coll].find(
filter=all_filters,
projection=self.projection).sort([('_id', ASCENDING)])
for doc in docs_cursor:
if not range_tracker.try_claim(doc['_id']):
return
yield doc
def display_data(self):
res = super(_BoundedMongoSource, self).display_data()
res['uri'] = _mask_uri_password(self.uri)
res['database'] = self.db
res['collection'] = self.coll
res['filter'] = json.dumps(self.filter)
res['projection'] = str(self.projection)
res['mongo_client_spec'] = json.dumps(_mask_spec_password(self.spec))
res['bucket_auto'] = self.bucket_auto
return res
def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos):
# calls mongodb splitVector command to get document ids at split position
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
# single document not splittable
return []
with MongoClient(self.uri, **self.spec) as client:
name_space = '%s.%s' % (self.db, self.coll)
return (
client[self.db].command(
'splitVector',
name_space,
keyPattern={'_id': 1}, # Ascending index
min={'_id': start_pos},
max={'_id': end_pos},
maxChunkSize=desired_chunk_size_in_mb)['splitKeys'])
def _get_auto_buckets(
self, desired_chunk_size_in_mb, start_pos, end_pos, is_initial_split):
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
# single document not splittable
return []
if is_initial_split and not self.filter:
# total collection size
size_in_mb = self.estimate_size() / float(1 << 20)
else:
# size of documents within start/end id range and possibly filtered
documents_count = self._count_id_range(start_pos, end_pos)
avg_document_size = self._estimate_average_document_size()
size_in_mb = documents_count * avg_document_size / float(1 << 20)
if size_in_mb == 0:
# no documents not splittable (maybe a result of filtering)
return []
bucket_count = math.ceil(size_in_mb / desired_chunk_size_in_mb)
with beam.io.mongodbio.MongoClient(self.uri, **self.spec) as client:
pipeline = [
{
# filter by positions and by the custom filter if any
'$match': self._merge_id_filter(start_pos, end_pos)
},
{
'$bucketAuto': {
'groupBy': '$_id', 'buckets': bucket_count
}
}
]
buckets = list(client[self.db][self.coll].aggregate(pipeline))
if buckets:
buckets[-1]['_id']['max'] = end_pos
return buckets
def _merge_id_filter(self, start_position, stop_position):
# Merge the default filter (if any) with refined _id field range
# of range_tracker.
# $gte specifies start position (inclusive)
# and $lt specifies the end position (exclusive),
# see more at
# https://docs.mongodb.com/manual/reference/operator/query/gte/ and
# https://docs.mongodb.com/manual/reference/operator/query/lt/
id_filter = {'_id': {'$gte': start_position, '$lt': stop_position}}
if self.filter:
all_filters = {
# see more at
# https://docs.mongodb.com/manual/reference/operator/query/and/
'$and': [self.filter.copy(), id_filter]
}
else:
all_filters = id_filter
return all_filters
def _get_head_document_id(self, sort_order):
with MongoClient(self.uri, **self.spec) as client:
cursor = client[self.db][self.coll].find(
filter={}, projection=[]).sort([('_id', sort_order)]).limit(1)
try:
return cursor[0]['_id']
except IndexError:
raise ValueError('Empty Mongodb collection')
def _replace_none_positions(self, start_position, stop_position):
if start_position is None:
start_position = self._get_head_document_id(ASCENDING)
if stop_position is None:
last_doc_id = self._get_head_document_id(DESCENDING)
# increment last doc id binary value by 1 to make sure the last document
# is not excluded
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
return start_position, stop_position
def _count_id_range(self, start_position, stop_position):
# Number of documents between start_position (inclusive)
# and stop_position (exclusive), respecting the custom filter if any.
with MongoClient(self.uri, **self.spec) as client:
return client[self.db][self.coll].count_documents(
filter=self._merge_id_filter(start_position, stop_position))
class _ObjectIdHelper(object):
"""A Utility class to manipulate bson object ids."""
@classmethod
def id_to_int(cls, id):
"""
Args:
id: ObjectId required for each MongoDB document _id field.
Returns: Converted integer value of ObjectId's 12 bytes binary value.
"""
# converts object id binary to integer
# id object is bytes type with size of 12
ints = struct.unpack('>III', id.binary)
return (ints[0] << 64) + (ints[1] << 32) + ints[2]
@classmethod
def int_to_id(cls, number):
"""
Args:
number(int): The integer value to be used to convert to ObjectId.
Returns: The ObjectId that has the 12 bytes binary converted from the
integer value.
"""
# converts integer value to object id. Int value should be less than
# (2 ^ 96) so it can be convert to 12 bytes required by object id.
if number < 0 or number >= (1 << 96):
raise ValueError('number value must be within [0, %s)' % (1 << 96))
ints = [(number & 0xffffffff0000000000000000) >> 64,
(number & 0x00000000ffffffff00000000) >> 32,
number & 0x0000000000000000ffffffff]
bytes = struct.pack('>III', *ints)
return objectid.ObjectId(bytes)
@classmethod
def increment_id(cls, object_id, inc):
"""
Args:
object_id: The ObjectId to change.
inc(int): The incremental int value to be added to ObjectId.
Returns:
"""
# increment object_id binary value by inc value and return new object id.
id_number = _ObjectIdHelper.id_to_int(object_id)
new_number = id_number + inc
if new_number < 0 or new_number >= (1 << 96):
raise ValueError(
'invalid incremental, inc value must be within ['
'%s, %s)' % (0 - id_number, 1 << 96 - id_number))
return _ObjectIdHelper.int_to_id(new_number)
class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
"""RangeTracker for tracking mongodb _id of bson ObjectId type."""
def position_to_fraction(self, pos, start, end):
pos_number = _ObjectIdHelper.id_to_int(pos)
start_number = _ObjectIdHelper.id_to_int(start)
end_number = _ObjectIdHelper.id_to_int(end)
return (pos_number - start_number) / (end_number - start_number)
def fraction_to_position(self, fraction, start, end):
start_number = _ObjectIdHelper.id_to_int(start)
end_number = _ObjectIdHelper.id_to_int(end)
total = end_number - start_number
pos = int(total * fraction + start_number)
# make sure split position is larger than start position and smaller than
# end position.
if pos <= start_number:
return _ObjectIdHelper.increment_id(start, 1)
if pos >= end_number:
return _ObjectIdHelper.increment_id(end, -1)
return _ObjectIdHelper.int_to_id(pos)
[docs]@experimental()
class WriteToMongoDB(PTransform):
"""WriteToMongoDB is a ``PTransform`` that writes a ``PCollection`` of
mongodb document to the configured MongoDB server.
In order to make the document writes idempotent so that the bundles are
retry-able without creating duplicates, the PTransform added 2 transformations
before final write stage:
a ``GenerateId`` transform and a ``Reshuffle`` transform.::
-----------------------------------------------
Pipeline --> |GenerateId --> Reshuffle --> WriteToMongoSink|
-----------------------------------------------
(WriteToMongoDB)
The ``GenerateId`` transform adds a random and unique*_id* field to the
documents if they don't already have one, it uses the same format as MongoDB
default. The ``Reshuffle`` transform makes sure that no fusion happens between
``GenerateId`` and the final write stage transform,so that the set of
documents and their unique IDs are not regenerated if final write step is
retried due to a failure. This prevents duplicate writes of the same document
with different unique IDs.
"""
def __init__(
self,
uri='mongodb://localhost:27017',
db=None,
coll=None,
batch_size=100,
extra_client_params=None):
"""
Args:
uri (str): The MongoDB connection string following the URI format
db (str): The MongoDB database name
coll (str): The MongoDB collection name
batch_size(int): Number of documents per bulk_write to MongoDB,
default to 100
extra_client_params(dict): Optional `MongoClient
<https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
parameters as keyword arguments
Returns:
:class:`~apache_beam.transforms.ptransform.PTransform`
"""
if extra_client_params is None:
extra_client_params = {}
if not isinstance(db, str):
raise ValueError('WriteToMongoDB db param must be specified as a string')
if not isinstance(coll, str):
raise ValueError(
'WriteToMongoDB coll param must be specified as a '
'string')
self._uri = uri
self._db = db
self._coll = coll
self._batch_size = batch_size
self._spec = extra_client_params
[docs] def expand(self, pcoll):
return pcoll \
| beam.ParDo(_GenerateObjectIdFn()) \
| Reshuffle() \
| beam.ParDo(_WriteMongoFn(self._uri, self._db, self._coll,
self._batch_size, self._spec))
class _GenerateObjectIdFn(DoFn):
def process(self, element, *args, **kwargs):
# if _id field already exist we keep it as it is, otherwise the ptransform
# generates a new _id field to achieve idempotent write to mongodb.
if '_id' not in element:
# object.ObjectId() generates a unique identifier that follows mongodb
# default format, if _id is not present in document, mongodb server
# generates it with this same function upon write. However the
# uniqueness of generated id may not be guaranteed if the work load are
# distributed across too many processes. See more on the ObjectId format
# https://docs.mongodb.com/manual/reference/bson-types/#objectid.
element['_id'] = objectid.ObjectId()
yield element
class _WriteMongoFn(DoFn):
def __init__(
self, uri=None, db=None, coll=None, batch_size=100, extra_params=None):
if extra_params is None:
extra_params = {}
self.uri = uri
self.db = db
self.coll = coll
self.spec = extra_params
self.batch_size = batch_size
self.batch = []
def finish_bundle(self):
self._flush()
def process(self, element, *args, **kwargs):
self.batch.append(element)
if len(self.batch) >= self.batch_size:
self._flush()
def _flush(self):
if len(self.batch) == 0:
return
with _MongoSink(self.uri, self.db, self.coll, self.spec) as sink:
sink.write(self.batch)
self.batch = []
def display_data(self):
res = super(_WriteMongoFn, self).display_data()
res['uri'] = _mask_uri_password(self.uri)
res['database'] = self.db
res['collection'] = self.coll
res['mongo_client_params'] = json.dumps(_mask_spec_password(self.spec))
res['batch_size'] = self.batch_size
return res
class _MongoSink(object):
def __init__(self, uri=None, db=None, coll=None, extra_params=None):
if extra_params is None:
extra_params = {}
self.uri = uri
self.db = db
self.coll = coll
self.spec = extra_params
self.client = None
def write(self, documents):
if self.client is None:
self.client = MongoClient(host=self.uri, **self.spec)
requests = []
for doc in documents:
# match document based on _id field, if not found in current collection,
# insert new one, otherwise overwrite it.
requests.append(
ReplaceOne(
filter={'_id': doc.get('_id', None)},
replacement=doc,
upsert=True))
resp = self.client[self.db][self.coll].bulk_write(requests)
_LOGGER.debug(
'BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, '
'nMatched:%d, Errors:%s' % (
resp.modified_count,
resp.upserted_count,
resp.matched_count,
resp.bulk_api_result.get('writeErrors')))
def __enter__(self):
if self.client is None:
self.client = MongoClient(host=self.uri, **self.spec)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.client is not None:
self.client.close()
def _mask_uri_password(uri):
# Masks password in uri if present
if uri:
components = urllib.parse.urlsplit(uri)
if components.password:
replaced = components._replace(
netloc='{}:{}@{}'.format(
components.username, '******', components.hostname))
uri = replaced.geturl()
return uri
def _mask_spec_password(spec):
# Masks password in spec if present
if spec and 'password' in spec:
spec = spec.copy()
spec['password'] = '******'
return spec