#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module implements IO classes to read and write data on MongoDB.
Read from MongoDB
-----------------
:class:`ReadFromMongoDB` is a ``PTransform`` that reads from a configured
MongoDB source and returns a ``PCollection`` of dict representing MongoDB
documents.
To configure MongoDB source, the URI to connect to MongoDB server, database
name, collection name needs to be provided.
Example usage::
  pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017',
                             db='testdb',
                             coll='input')
To read from MongoDB Atlas, use ``bucket_auto`` option to enable
``@bucketAuto`` MongoDB aggregation instead of ``splitVector``
command which is a high-privilege function that cannot be assigned
to any user in Atlas.
Example usage::
  pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
                             db='testdb',
                             coll='input',
                             bucket_auto=True)
Write to MongoDB:
-----------------
:class:`WriteToMongoDB` is a ``PTransform`` that writes MongoDB documents to
configured sink, and the write is conducted through a mongodb bulk_write of
``ReplaceOne`` operations. If the document's _id field already existed in the
MongoDB collection, it results in an overwrite, otherwise, a new document
will be inserted.
Example usage::
  pipeline | WriteToMongoDB(uri='mongodb://localhost:27017',
                            db='testdb',
                            coll='output',
                            batch_size=10)
No backward compatibility guarantees. Everything in this module is experimental.
"""
# pytype: skip-file
import itertools
import json
import logging
import math
import struct
from typing import Union
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import LexicographicKeyRangeTracker
from apache_beam.io.range_trackers import OffsetRangeTracker
from apache_beam.io.range_trackers import OrderedPositionRangeTracker
from apache_beam.transforms import DoFn
from apache_beam.transforms import PTransform
from apache_beam.transforms import Reshuffle
_LOGGER = logging.getLogger(__name__)
try:
  # Mongodb has its own bundled bson, which is not compatible with bson package.
  # (https://github.com/py-bson/bson/issues/82). Try to import objectid and if
  # it fails because bson package is installed, MongoDB IO will not work but at
  # least rest of the SDK will work.
  from bson import json_util
  from bson import objectid
  from bson.objectid import ObjectId
  # pymongo also internally depends on bson.
  from pymongo import ASCENDING
  from pymongo import DESCENDING
  from pymongo import MongoClient
  from pymongo import ReplaceOne
except ImportError:
  objectid = None
  json_util = None
  ObjectId = None
  ASCENDING = 1
  DESCENDING = -1
  MongoClient = None
  ReplaceOne = None
  _LOGGER.warning("Could not find a compatible bson package.")
__all__ = ["ReadFromMongoDB", "WriteToMongoDB"]
[docs]class ReadFromMongoDB(PTransform):
  """A ``PTransform`` to read MongoDB documents into a ``PCollection``."""
  def __init__(
      self,
      uri="mongodb://localhost:27017",
      db=None,
      coll=None,
      filter=None,
      projection=None,
      extra_client_params=None,
      bucket_auto=False,
  ):
    """Initialize a :class:`ReadFromMongoDB`
    Args:
      uri (str): The MongoDB connection string following the URI format.
      db (str): The MongoDB database name.
      coll (str): The MongoDB collection name.
      filter: A `bson.SON
        <https://api.mongodb.com/python/current/api/bson/son.html>`_ object
        specifying elements which must be present for a document to be included
        in the result set.
      projection: A list of field names that should be returned in the result
        set or a dict specifying the fields to include or exclude.
      extra_client_params(dict): Optional `MongoClient
        <https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
        parameters.
      bucket_auto (bool): If :data:`True`, use MongoDB `$bucketAuto` aggregation
        to split collection into bundles instead of `splitVector` command,
        which does not work with MongoDB Atlas.
        If :data:`False` (the default), use `splitVector` command for bundling.
    Returns:
      :class:`~apache_beam.transforms.ptransform.PTransform`
    """
    if extra_client_params is None:
      extra_client_params = {}
    if not isinstance(db, str):
      raise ValueError("ReadFromMongDB db param must be specified as a string")
    if not isinstance(coll, str):
      raise ValueError(
          "ReadFromMongDB coll param must be specified as a string")
    self._mongo_source = _BoundedMongoSource(
        uri=uri,
        db=db,
        coll=coll,
        filter=filter,
        projection=projection,
        extra_client_params=extra_client_params,
        bucket_auto=bucket_auto,
    )
[docs]  def expand(self, pcoll):
    return pcoll | iobase.Read(self._mongo_source)  
class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
  """RangeTracker for tracking mongodb _id of bson ObjectId type."""
  def position_to_fraction(
      self,
      pos: ObjectId,
      start: ObjectId,
      end: ObjectId,
  ):
    """Returns the fraction of keys in the range [start, end) that
    are less than the given key.
    """
    pos_number = _ObjectIdHelper.id_to_int(pos)
    start_number = _ObjectIdHelper.id_to_int(start)
    end_number = _ObjectIdHelper.id_to_int(end)
    return (pos_number - start_number) / (end_number - start_number)
  def fraction_to_position(
      self,
      fraction: float,
      start: ObjectId,
      end: ObjectId,
  ):
    """Converts a fraction between 0 and 1
    to a position between start and end.
    """
    start_number = _ObjectIdHelper.id_to_int(start)
    end_number = _ObjectIdHelper.id_to_int(end)
    total = end_number - start_number
    pos = int(total * fraction + start_number)
    # make sure split position is larger than start position and smaller than
    # end position.
    if pos <= start_number:
      return _ObjectIdHelper.increment_id(start, 1)
    if pos >= end_number:
      return _ObjectIdHelper.increment_id(end, -1)
    return _ObjectIdHelper.int_to_id(pos)
class _BoundedMongoSource(iobase.BoundedSource):
  """A MongoDB source that reads a finite amount of input records.
  This class defines following operations which can be used to read
  MongoDB source efficiently.
  * Size estimation - method ``estimate_size()`` may return an accurate
    estimation in bytes for the size of the source.
  * Splitting into bundles of a given size - method ``split()`` can be used to
    split the source into a set of sub-sources (bundles) based on a desired
    bundle size.
  * Getting a RangeTracker - method ``get_range_tracker()`` should return a
    ``RangeTracker`` object for a given position range for the position type
    of the records returned by the source.
  * Reading the data - method ``read()`` can be used to read data from the
    source while respecting the boundaries defined by a given
    ``RangeTracker``.
  A runner will perform reading the source in two steps.
  (1) Method ``get_range_tracker()`` will be invoked with start and end
      positions to obtain a ``RangeTracker`` for the range of positions the
      runner intends to read. Source must define a default initial start and end
      position range. These positions must be used if the start and/or end
      positions passed to the method ``get_range_tracker()`` are ``None``
  (2) Method read() will be invoked with the ``RangeTracker`` obtained in the
      previous step.
  **Mutability**
  A ``_BoundedMongoSource`` object should not be mutated while
  its methods (for example, ``read()``) are being invoked by a runner. Runner
  implementations may invoke methods of ``_BoundedMongoSource`` objects through
  multi-threaded and/or reentrant execution modes.
  """
  def __init__(
      self,
      uri=None,
      db=None,
      coll=None,
      filter=None,
      projection=None,
      extra_client_params=None,
      bucket_auto=False,
  ):
    if extra_client_params is None:
      extra_client_params = {}
    if filter is None:
      filter = {}
    self.uri = uri
    self.db = db
    self.coll = coll
    self.filter = filter
    self.projection = projection
    self.spec = extra_client_params
    self.bucket_auto = bucket_auto
  def estimate_size(self):
    with MongoClient(self.uri, **self.spec) as client:
      return client[self.db].command("collstats", self.coll).get("size")
  def _estimate_average_document_size(self):
    with MongoClient(self.uri, **self.spec) as client:
      return client[self.db].command("collstats", self.coll).get("avgObjSize")
  def split(
      self,
      desired_bundle_size: int,
      start_position: Union[int, str, bytes, ObjectId] = None,
      stop_position: Union[int, str, bytes, ObjectId] = None,
  ):
    """Splits the source into a set of bundles.
    Bundles should be approximately of size ``desired_bundle_size`` bytes.
    Args:
      desired_bundle_size: the desired size (in bytes) of the bundles returned.
      start_position: if specified the given position must be used as the
                      starting position of the first bundle.
      stop_position: if specified the given position must be used as the ending
                     position of the last bundle.
    Returns:
      an iterator of objects of type 'SourceBundle' that gives information about
      the generated bundles.
    """
    desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024
    # for desired bundle size, if desired chunk size smaller than 1mb, use
    # MongoDB default split size of 1mb.
    desired_bundle_size_in_mb = max(desired_bundle_size_in_mb, 1)
    is_initial_split = start_position is None and stop_position is None
    start_position, stop_position = self._replace_none_positions(
      start_position, stop_position
    )
    if self.bucket_auto:
      # Use $bucketAuto for bundling
      split_keys = []
      weights = []
      for bucket in self._get_auto_buckets(
          desired_bundle_size_in_mb,
          start_position,
          stop_position,
          is_initial_split,
      ):
        split_keys.append({"_id": bucket["_id"]["max"]})
        weights.append(bucket["count"])
    else:
      # Use splitVector for bundling
      split_keys = self._get_split_keys(
          desired_bundle_size_in_mb, start_position, stop_position)
      weights = itertools.cycle((desired_bundle_size_in_mb, ))
    bundle_start = start_position
    for split_key_id, weight in zip(split_keys, weights):
      if bundle_start >= stop_position:
        break
      bundle_end = min(stop_position, split_key_id["_id"])
      yield iobase.SourceBundle(
          weight=weight,
          source=self,
          start_position=bundle_start,
          stop_position=bundle_end,
      )
      bundle_start = bundle_end
    # add range of last split_key to stop_position
    if bundle_start < stop_position:
      # bucket_auto mode can come here if not split due to single document
      weight = 1 if self.bucket_auto else desired_bundle_size_in_mb
      yield iobase.SourceBundle(
          weight=weight,
          source=self,
          start_position=bundle_start,
          stop_position=stop_position,
      )
  def get_range_tracker(
      self,
      start_position: Union[int, str, ObjectId] = None,
      stop_position: Union[int, str, ObjectId] = None,
  ) -> Union[
      _ObjectIdRangeTracker, OffsetRangeTracker, LexicographicKeyRangeTracker]:
    """Returns a RangeTracker for a given position range depending on type.
    Args:
      start_position: starting position of the range. If 'None' default start
                      position of the source must be used.
      stop_position:  ending position of the range. If 'None' default stop
                      position of the source must be used.
    Returns:
      a ``_ObjectIdRangeTracker``, ``OffsetRangeTracker``
      or ``LexicographicKeyRangeTracker`` depending on the given position range.
    """
    start_position, stop_position = self._replace_none_positions(
      start_position, stop_position
    )
    if isinstance(start_position, ObjectId):
      return _ObjectIdRangeTracker(start_position, stop_position)
    if isinstance(start_position, int):
      return OffsetRangeTracker(start_position, stop_position)
    if isinstance(start_position, str):
      return LexicographicKeyRangeTracker(start_position, stop_position)
    raise NotImplementedError(
        f"RangeTracker for {type(start_position)} not implemented!")
  def read(self, range_tracker):
    """Returns an iterator that reads data from the source.
    The returned set of data must respect the boundaries defined by the given
    ``RangeTracker`` object. For example:
      * Returned set of data must be for the range
        ``[range_tracker.start_position, range_tracker.stop_position)``. Note
        that a source may decide to return records that start after
        ``range_tracker.stop_position``. See documentation in class
        ``RangeTracker`` for more details. Also, note that framework might
        invoke ``range_tracker.try_split()`` to perform dynamic split
        operations. range_tracker.stop_position may be updated
        dynamically due to successful dynamic split operations.
      * Method ``range_tracker.try_split()`` must be invoked for every record
        that starts at a split point.
      * Method ``range_tracker.record_current_position()`` may be invoked for
        records that do not start at split points.
    Args:
      range_tracker: a ``RangeTracker`` whose boundaries must be respected
                     when reading data from the source. A runner that reads this
                     source muss pass a ``RangeTracker`` object that is not
                     ``None``.
    Returns:
      an iterator of data read by the source.
    """
    with MongoClient(self.uri, **self.spec) as client:
      all_filters = self._merge_id_filter(
          range_tracker.start_position(), range_tracker.stop_position())
      docs_cursor = (
          client[self.db][self.coll].find(
              filter=all_filters,
              projection=self.projection).sort([("_id", ASCENDING)]))
      for doc in docs_cursor:
        if not range_tracker.try_claim(doc["_id"]):
          return
        yield doc
  def display_data(self):
    """Returns the display data associated to a pipeline component."""
    res = super().display_data()
    res["database"] = self.db
    res["collection"] = self.coll
    res["filter"] = json.dumps(self.filter, default=json_util.default)
    res["projection"] = str(self.projection)
    res["bucket_auto"] = self.bucket_auto
    return res
  @staticmethod
  def _range_is_not_splittable(
      start_pos: Union[int, str, ObjectId],
      end_pos: Union[int, str, ObjectId],
  ):
    """Return `True` if splitting range doesn't make sense
    (single document is not splittable),
    Return `False` otherwise.
    """
    return ((
        isinstance(start_pos, ObjectId) and
        start_pos >= _ObjectIdHelper.increment_id(end_pos, -1)) or
            (isinstance(start_pos, int) and start_pos >= end_pos - 1) or
            (isinstance(start_pos, str) and start_pos >= end_pos))
  def _get_split_keys(
      self,
      desired_chunk_size_in_mb: int,
      start_pos: Union[int, str, ObjectId],
      end_pos: Union[int, str, ObjectId],
  ):
    """Calls MongoDB `splitVector` command
    to get document ids at split position.
    """
    # single document not splittable
    if self._range_is_not_splittable(start_pos, end_pos):
      return []
    with MongoClient(self.uri, **self.spec) as client:
      name_space = "%s.%s" % (self.db, self.coll)
      return client[self.db].command(
        "splitVector",
        name_space,
        keyPattern={"_id": 1},  # Ascending index
        min={"_id": start_pos},
        max={"_id": end_pos},
        maxChunkSize=desired_chunk_size_in_mb,
      )["splitKeys"]
  def _get_auto_buckets(
      self,
      desired_chunk_size_in_mb: int,
      start_pos: Union[int, str, ObjectId],
      end_pos: Union[int, str, ObjectId],
      is_initial_split: bool,
  ) -> list:
    """Use MongoDB `$bucketAuto` aggregation to split collection into bundles
    instead of `splitVector` command, which does not work with MongoDB Atlas.
    """
    # single document not splittable
    if self._range_is_not_splittable(start_pos, end_pos):
      return []
    if is_initial_split and not self.filter:
      # total collection size in MB
      size_in_mb = self.estimate_size() / float(1 << 20)
    else:
      # size of documents within start/end id range and possibly filtered
      documents_count = self._count_id_range(start_pos, end_pos)
      avg_document_size = self._estimate_average_document_size()
      size_in_mb = documents_count * avg_document_size / float(1 << 20)
    if size_in_mb == 0:
      # no documents not splittable (maybe a result of filtering)
      return []
    bucket_count = math.ceil(size_in_mb / desired_chunk_size_in_mb)
    with beam.io.mongodbio.MongoClient(self.uri, **self.spec) as client:
      pipeline = [
          {
              # filter by positions and by the custom filter if any
              "$match": self._merge_id_filter(start_pos, end_pos)
          },
          {
              "$bucketAuto": {
                  "groupBy": "$_id", "buckets": bucket_count
              }
          },
      ]
      buckets = list(
          # Use `allowDiskUse` option to avoid aggregation limit of 100 Mb RAM
          client[self.db][self.coll].aggregate(pipeline, allowDiskUse=True))
      if buckets:
        buckets[-1]["_id"]["max"] = end_pos
      return buckets
  def _merge_id_filter(
      self,
      start_position: Union[int, str, bytes, ObjectId],
      stop_position: Union[int, str, bytes, ObjectId] = None,
  ) -> dict:
    """Merge the default filter (if any) with refined _id field range
    of range_tracker.
    $gte specifies start position (inclusive)
    and $lt specifies the end position (exclusive),
    see more at
    https://docs.mongodb.com/manual/reference/operator/query/gte/ and
    https://docs.mongodb.com/manual/reference/operator/query/lt/
    """
    if stop_position is None:
      id_filter = {"_id": {"$gte": start_position}}
    else:
      id_filter = {"_id": {"$gte": start_position, "$lt": stop_position}}
    if self.filter:
      all_filters = {
          # see more at
          # https://docs.mongodb.com/manual/reference/operator/query/and/
          "$and": [self.filter.copy(), id_filter]
      }
    else:
      all_filters = id_filter
    return all_filters
  def _get_head_document_id(self, sort_order):
    with MongoClient(self.uri, **self.spec) as client:
      cursor = (
          client[self.db][self.coll].find(filter={}, projection=[]).sort([
              ("_id", sort_order)
          ]).limit(1))
      try:
        return cursor[0]["_id"]
      except IndexError:
        raise ValueError("Empty Mongodb collection")
  def _replace_none_positions(self, start_position, stop_position):
    if start_position is None:
      start_position = self._get_head_document_id(ASCENDING)
    if stop_position is None:
      last_doc_id = self._get_head_document_id(DESCENDING)
      # increment last doc id binary value by 1 to make sure the last document
      # is not excluded
      if isinstance(last_doc_id, ObjectId):
        stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
      elif isinstance(last_doc_id, int):
        stop_position = last_doc_id + 1
      elif isinstance(last_doc_id, str):
        stop_position = last_doc_id + '\x00'
    return start_position, stop_position
  def _count_id_range(self, start_position, stop_position):
    """Number of documents between start_position (inclusive)
    and stop_position (exclusive), respecting the custom filter if any.
    """
    with MongoClient(self.uri, **self.spec) as client:
      return client[self.db][self.coll].count_documents(
          filter=self._merge_id_filter(start_position, stop_position))
class _ObjectIdHelper:
  """A Utility class to manipulate bson object ids."""
  @classmethod
  def id_to_int(cls, _id: Union[int, ObjectId]) -> int:
    """
    Args:
      _id: ObjectId required for each MongoDB document _id field.
    Returns: Converted integer value of ObjectId's 12 bytes binary value.
    """
    if isinstance(_id, int):
      return _id
    # converts object id binary to integer
    # id object is bytes type with size of 12
    ints = struct.unpack(">III", _id.binary)
    return (ints[0] << 64) + (ints[1] << 32) + ints[2]
  @classmethod
  def int_to_id(cls, number):
    """
    Args:
      number(int): The integer value to be used to convert to ObjectId.
    Returns: The ObjectId that has the 12 bytes binary converted from the
      integer value.
    """
    # converts integer value to object id. Int value should be less than
    # (2 ^ 96) so it can be convert to 12 bytes required by object id.
    if number < 0 or number >= (1 << 96):
      raise ValueError("number value must be within [0, %s)" % (1 << 96))
    ints = [
        (number & 0xFFFFFFFF0000000000000000) >> 64,
        (number & 0x00000000FFFFFFFF00000000) >> 32,
        number & 0x0000000000000000FFFFFFFF,
    ]
    number_bytes = struct.pack(">III", *ints)
    return ObjectId(number_bytes)
  @classmethod
  def increment_id(
      cls,
      _id: ObjectId,
      inc: int,
  ) -> ObjectId:
    """
    Increment object_id binary value by inc value and return new object id.
    Args:
      _id: The `_id` to change.
      inc(int): The incremental int value to be added to `_id`.
    Returns:
        `_id` incremented by `inc` value
    """
    id_number = _ObjectIdHelper.id_to_int(_id)
    new_number = id_number + inc
    if new_number < 0 or new_number >= (1 << 96):
      raise ValueError(
          "invalid incremental, inc value must be within ["
          "%s, %s)" % (0 - id_number, 1 << 96 - id_number))
    return _ObjectIdHelper.int_to_id(new_number)
[docs]class WriteToMongoDB(PTransform):
  """WriteToMongoDB is a ``PTransform`` that writes a ``PCollection`` of
  mongodb document to the configured MongoDB server.
  In order to make the document writes idempotent so that the bundles are
  retry-able without creating duplicates, the PTransform added 2 transformations
  before final write stage:
  a ``GenerateId`` transform and a ``Reshuffle`` transform.::
                  -----------------------------------------------
    Pipeline -->  |GenerateId --> Reshuffle --> WriteToMongoSink|
                  -----------------------------------------------
                                  (WriteToMongoDB)
  The ``GenerateId`` transform adds a random and unique*_id* field to the
  documents if they don't already have one, it uses the same format as MongoDB
  default. The ``Reshuffle`` transform makes sure that no fusion happens between
  ``GenerateId`` and the final write stage transform,so that the set of
  documents and their unique IDs are not regenerated if final write step is
  retried due to a failure. This prevents duplicate writes of the same document
  with different unique IDs.
  """
  def __init__(
      self,
      uri="mongodb://localhost:27017",
      db=None,
      coll=None,
      batch_size=100,
      extra_client_params=None,
  ):
    """
    Args:
      uri (str): The MongoDB connection string following the URI format
      db (str): The MongoDB database name
      coll (str): The MongoDB collection name
      batch_size(int): Number of documents per bulk_write to  MongoDB,
        default to 100
      extra_client_params(dict): Optional `MongoClient
       <https://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_
       parameters as keyword arguments
    Returns:
      :class:`~apache_beam.transforms.ptransform.PTransform`
    """
    if extra_client_params is None:
      extra_client_params = {}
    if not isinstance(db, str):
      raise ValueError("WriteToMongoDB db param must be specified as a string")
    if not isinstance(coll, str):
      raise ValueError(
          "WriteToMongoDB coll param must be specified as a string")
    self._uri = uri
    self._db = db
    self._coll = coll
    self._batch_size = batch_size
    self._spec = extra_client_params
[docs]  def expand(self, pcoll):
    return (
        pcoll
        | beam.ParDo(_GenerateObjectIdFn())
        | Reshuffle()
        | beam.ParDo(
            _WriteMongoFn(
                self._uri, self._db, self._coll, self._batch_size, self._spec)))  
class _GenerateObjectIdFn(DoFn):
  def process(self, element, *args, **kwargs):
    # if _id field already exist we keep it as it is, otherwise the ptransform
    # generates a new _id field to achieve idempotent write to mongodb.
    if "_id" not in element:
      # object.ObjectId() generates a unique identifier that follows mongodb
      # default format, if _id is not present in document, mongodb server
      # generates it with this same function upon write. However the
      # uniqueness of generated id may not be guaranteed if the work load are
      # distributed across too many processes. See more on the ObjectId format
      # https://docs.mongodb.com/manual/reference/bson-types/#objectid.
      element["_id"] = objectid.ObjectId()
    yield element
class _WriteMongoFn(DoFn):
  def __init__(
      self, uri=None, db=None, coll=None, batch_size=100, extra_params=None):
    if extra_params is None:
      extra_params = {}
    self.uri = uri
    self.db = db
    self.coll = coll
    self.spec = extra_params
    self.batch_size = batch_size
    self.batch = []
  def finish_bundle(self):
    self._flush()
  def process(self, element, *args, **kwargs):
    self.batch.append(element)
    if len(self.batch) >= self.batch_size:
      self._flush()
  def _flush(self):
    if len(self.batch) == 0:
      return
    with _MongoSink(self.uri, self.db, self.coll, self.spec) as sink:
      sink.write(self.batch)
      self.batch = []
  def display_data(self):
    res = super().display_data()
    res["database"] = self.db
    res["collection"] = self.coll
    res["batch_size"] = self.batch_size
    return res
class _MongoSink:
  def __init__(self, uri=None, db=None, coll=None, extra_params=None):
    if extra_params is None:
      extra_params = {}
    self.uri = uri
    self.db = db
    self.coll = coll
    self.spec = extra_params
    self.client = None
  def write(self, documents):
    if self.client is None:
      self.client = MongoClient(host=self.uri, **self.spec)
    requests = []
    for doc in documents:
      # match document based on _id field, if not found in current collection,
      # insert new one, otherwise overwrite it.
      requests.append(
          ReplaceOne(
              filter={"_id": doc.get("_id", None)},
              replacement=doc,
              upsert=True))
    resp = self.client[self.db][self.coll].bulk_write(requests)
    _LOGGER.debug(
        "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
        "nMatched:%d, Errors:%s" % (
            resp.modified_count,
            resp.upserted_count,
            resp.matched_count,
            resp.bulk_api_result.get("writeErrors"),
        ))
  def __enter__(self):
    if self.client is None:
      self.client = MongoClient(host=self.uri, **self.spec)
    return self
  def __exit__(self, exc_type, exc_val, exc_tb):
    if self.client is not None:
      self.client.close()