#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Dataflow native sources and sinks.
For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
from __future__ import absolute_import
import logging
from builtins import object
from typing import TYPE_CHECKING
from typing import Optional
from apache_beam import pvalue
from apache_beam.io import iobase
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import HasDisplayData
if TYPE_CHECKING:
from apache_beam import coders
_LOGGER = logging.getLogger(__name__)
def _dict_printable_fields(dict_object, skip_fields):
"""Returns a list of strings for the interesting fields of a dict."""
return [
'%s=%r' % (name, value) for name,
value in dict_object.items()
# want to output value 0 but not None nor []
if (value or value == 0) and name not in skip_fields
]
_minor_fields = [
'coder',
'key_coder',
'value_coder',
'config_bytes',
'elements',
'append_trailing_newlines',
'strip_trailing_newlines',
'compression_type'
]
[docs]class NativeSource(iobase.SourceBase):
"""A source implemented by Dataflow service.
This class is to be only inherited by sources natively implemented by Cloud
Dataflow service, hence should not be sub-classed by users.
This class is deprecated and should not be used to define new sources.
"""
coder = None # type: Optional[coders.Coder]
[docs] def reader(self):
"""Returns a NativeSourceReader instance associated with this source."""
raise NotImplementedError
[docs] def is_bounded(self):
return True
def __repr__(self):
return '<{name} {vals}>'.format(
name=self.__class__.__name__,
vals=', '.join(_dict_printable_fields(self.__dict__, _minor_fields)))
[docs]class NativeSourceReader(object):
"""A reader for a source implemented by Dataflow service."""
def __enter__(self):
"""Opens everything necessary for a reader to function properly."""
raise NotImplementedError
def __exit__(self, exception_type, exception_value, traceback):
"""Cleans up after a reader executed."""
raise NotImplementedError
def __iter__(self):
"""Returns an iterator over all the records of the source."""
raise NotImplementedError
@property
def returns_windowed_values(self):
"""Returns whether this reader returns windowed values."""
return False
[docs] def get_progress(self):
"""Returns a representation of how far the reader has read.
Returns:
A SourceReaderProgress object that gives the current progress of the
reader.
"""
[docs] def request_dynamic_split(self, dynamic_split_request):
"""Attempts to split the input in two parts.
The two parts are named the "primary" part and the "residual" part. The
current 'NativeSourceReader' keeps processing the primary part, while the
residual part will be processed elsewhere (e.g. perhaps on a different
worker).
The primary and residual parts, if concatenated, must represent the
same input as the current input of this 'NativeSourceReader' before this
call.
The boundary between the primary part and the residual part is
specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
if the framework supports the notion of positions, it might be a
position at which the input is asked to split itself (which is not
necessarily the same position at which it *will* split itself); it
might be an approximate fraction of input, or something else.
This function returns a 'DynamicSplitResult', which encodes, in a
framework-specific way, the information sufficient to construct a
description of the resulting primary and residual inputs. For example, it
might, again, be a position demarcating these parts, or it might be a pair
of fully-specified input descriptions, or something else.
After a successful call to 'request_dynamic_split()', subsequent calls
should be interpreted relative to the new primary.
Args:
dynamic_split_request: A 'DynamicSplitRequest' describing the split
request.
Returns:
'None' if the 'DynamicSplitRequest' cannot be honored (in that
case the input represented by this 'NativeSourceReader' stays the same),
or a 'DynamicSplitResult' describing how the input was split into a
primary and residual part.
"""
_LOGGER.debug(
'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
'split request: %r',
self,
dynamic_split_request)
[docs]class ReaderProgress(object):
"""A representation of how far a NativeSourceReader has read."""
def __init__(
self,
position=None,
percent_complete=None,
remaining_time=None,
consumed_split_points=None,
remaining_split_points=None):
self._position = position
if percent_complete is not None:
percent_complete = float(percent_complete)
if percent_complete < 0 or percent_complete > 1:
raise ValueError(
'The percent_complete argument was %f. Must be in range [0, 1].' %
percent_complete)
self._percent_complete = percent_complete
self._remaining_time = remaining_time
self._consumed_split_points = consumed_split_points
self._remaining_split_points = remaining_split_points
@property
def position(self):
"""Returns progress, represented as a ReaderPosition object."""
return self._position
@property
def percent_complete(self):
"""Returns progress, represented as a percentage of total work.
Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
work range, entire WorkItem complete).
Returns:
Progress represented as a percentage of total work.
"""
return self._percent_complete
@property
def remaining_time(self):
"""Returns progress, represented as an estimated time remaining."""
return self._remaining_time
@property
def consumed_split_points(self):
return self._consumed_split_points
@property
def remaining_split_points(self):
return self._remaining_split_points
[docs]class ReaderPosition(object):
"""A representation of position in an iteration of a 'NativeSourceReader'."""
def __init__(
self,
end=None,
key=None,
byte_offset=None,
record_index=None,
shuffle_position=None,
concat_position=None):
"""Initializes ReaderPosition.
A ReaderPosition may get instantiated for one of these position types. Only
one of these should be specified.
Args:
end: position is past all other positions. For example, this may be used
to represent the end position of an unbounded range.
key: position is a string key.
byte_offset: position is a byte offset.
record_index: position is a record index
shuffle_position: position is a base64 encoded shuffle position.
concat_position: position is a 'ConcatPosition'.
"""
self.end = end
self.key = key
self.byte_offset = byte_offset
self.record_index = record_index
self.shuffle_position = shuffle_position
if concat_position is not None:
assert isinstance(concat_position, ConcatPosition)
self.concat_position = concat_position
[docs]class ConcatPosition(object):
"""A position that encapsulate an inner position and an index.
This is used to represent the position of a source that encapsulate several
other sources.
"""
def __init__(self, index, position):
"""Initializes ConcatPosition.
Args:
index: index of the source currently being read.
position: inner position within the source currently being read.
"""
if position is not None:
assert isinstance(position, ReaderPosition)
self.index = index
self.position = position
[docs]class DynamicSplitRequest(object):
"""Specifies how 'NativeSourceReader.request_dynamic_split' should split.
"""
def __init__(self, progress):
assert isinstance(progress, ReaderProgress)
self.progress = progress
[docs]class DynamicSplitResult(object):
pass
[docs]class DynamicSplitResultWithPosition(DynamicSplitResult):
def __init__(self, stop_position):
assert isinstance(stop_position, ReaderPosition)
self.stop_position = stop_position
[docs]class NativeSink(HasDisplayData):
"""A sink implemented by Dataflow service.
This class is to be only inherited by sinks natively implemented by Cloud
Dataflow service, hence should not be sub-classed by users.
"""
[docs] def writer(self):
"""Returns a SinkWriter for this source."""
raise NotImplementedError
def __repr__(self):
return '<{name} {vals}>'.format(
name=self.__class__.__name__,
vals=_dict_printable_fields(self.__dict__, _minor_fields))
[docs]class NativeSinkWriter(object):
"""A writer for a sink implemented by Dataflow service."""
def __enter__(self):
"""Opens everything necessary for a writer to function properly."""
raise NotImplementedError
def __exit__(self, exception_type, exception_value, traceback):
"""Cleans up after a writer executed."""
raise NotImplementedError
@property
def takes_windowed_values(self):
"""Returns whether this writer takes windowed values."""
return False
[docs] def Write(self, o): # pylint: disable=invalid-name
"""Writes a record to the sink associated with this writer."""
raise NotImplementedError
class _NativeWrite(ptransform.PTransform):
"""A PTransform for writing to a Dataflow native sink.
These are sinks that are implemented natively by the Dataflow service
and hence should not be updated by users. These sinks are processed
using a Dataflow native write transform.
Applying this transform results in a ``pvalue.PDone``.
"""
def __init__(self, sink):
"""Initializes a Write transform.
Args:
sink: Sink to use for the write
"""
super(_NativeWrite, self).__init__()
self.sink = sink
def expand(self, pcoll):
self._check_pcollection(pcoll)
return pvalue.PDone(pcoll.pipeline)