#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from apache_beam.io.gcp.pubsublite.external import _ReadExternal
from apache_beam.io.gcp.pubsublite.external import _WriteExternal
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
try:
  from google.cloud import pubsublite
except ImportError:
  pubsublite = None
[docs]class ReadFromPubSubLite(PTransform):
  """
  A ``PTransform`` for reading from Pub/Sub Lite.
  Produces a PCollection of google.cloud.pubsublite.SequencedMessage
  Experimental; no backwards-compatibility guarantees.
  """
  def __init__(
      self,
      subscription_path,
      min_bundle_timeout=None,
      deduplicate=None,
      expansion_service=None,
  ):
    """Initializes ``ReadFromPubSubLite``.
    Args:
      subscription_path: Pub/Sub Lite Subscription in the form
          projects/<project>/locations/<location>/subscriptions/<subscription>
      min_bundle_timeout: The minimum wall time to pass before allowing
          bundle closure. Setting this to too small of a value will result in
          increased compute costs and lower throughput per byte. Immediate
          timeouts (0) may be useful for testing.
      deduplicate: Whether to deduplicate messages based on the value of
          the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False.
    """
    super().__init__()
    self._source = _ReadExternal(
        subscription_path=subscription_path,
        min_bundle_timeout=min_bundle_timeout,
        deduplicate=deduplicate,
        expansion_service=expansion_service,
    )
[docs]  def expand(self, pvalue):
    pcoll = pvalue.pipeline | self._source
    pcoll.element_type = bytes
    pcoll = pcoll | Map(pubsublite.SequencedMessage.deserialize)
    pcoll.element_type = pubsublite.SequencedMessage
    return pcoll  
[docs]class WriteToPubSubLite(PTransform):
  """
  A ``PTransform`` for writing to Pub/Sub Lite.
  Consumes a PCollection of google.cloud.pubsublite.PubSubMessage
  Experimental; no backwards-compatibility guarantees.
  """
  def __init__(
      self,
      topic_path,
      add_uuids=None,
      expansion_service=None,
  ):
    """Initializes ``WriteToPubSubLite``.
    Args:
      topic_path: A Pub/Sub Lite Topic path.
      add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid'
          uuid attribute. Defaults to False.
    """
    super().__init__()
    self._source = _WriteExternal(
        topic_path=topic_path,
        add_uuids=add_uuids,
        expansion_service=expansion_service,
    )
  @staticmethod
  def _message_to_proto_str(element: pubsublite.PubSubMessage):
    if not isinstance(element, pubsublite.PubSubMessage):
      raise TypeError(
          'Unexpected element. Type: %s (expected: PubSubMessage), '
          'value: %r' % (type(element), element))
    return pubsublite.PubSubMessage.serialize(element)
[docs]  def expand(self, pcoll):
    pcoll = pcoll | Map(WriteToPubSubLite._message_to_proto_str)
    pcoll.element_type = bytes
    pcoll = pcoll | self._source
    return pcoll