Source code for apache_beam.io.range_trackers

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""iobase.RangeTracker implementations provided with Dataflow SDK.
"""

import logging
import math
import threading

from apache_beam.io import iobase

__all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker',
           'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']


class OffsetRange(object):

  def __init__(self, start, stop):
    if start >= stop:
      raise ValueError(
          'Start offset must be smaller than the stop offset. '
          'Received %d and %d respectively.', start, stop)
    self.start = start
    self.stop = stop

  def __eq__(self, other):
    if not isinstance(other, OffsetRange):
      return False

    return self.start == other.start and self.stop == other.stop

  def __ne__(self, other):
    if not isinstance(other, OffsetRange):
      return True

    return not (self.start == other.start and self.stop == other.stop)

  def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
    current_split_start = self.start
    max_split_size = max(desired_num_offsets_per_split,
                         min_num_offsets_per_split)
    while current_split_start < self.stop:
      current_split_stop = min(current_split_start + max_split_size, self.stop)
      remaining = self.stop - current_split_stop

      # Avoiding a small split at the end.
      if (remaining < desired_num_offsets_per_split / 4 or
          remaining < min_num_offsets_per_split):
        current_split_stop = self.stop

      yield OffsetRange(current_split_start, current_split_stop)
      current_split_start = current_split_stop

  def new_tracker(self):
    return OffsetRangeTracker(self.start, self.stop)


[docs]class OffsetRangeTracker(iobase.RangeTracker): """A 'RangeTracker' for non-negative positions of type 'long'.""" # Offset corresponding to infinity. This can only be used as the upper-bound # of a range, and indicates reading all of the records until the end without # specifying exactly what the end is. # Infinite ranges cannot be split because it is impossible to estimate # progress within them. OFFSET_INFINITY = float('inf') def __init__(self, start, end): super(OffsetRangeTracker, self).__init__() if start is None: raise ValueError('Start offset must not be \'None\'') if end is None: raise ValueError('End offset must not be \'None\'') assert isinstance(start, (int, long)) if end != self.OFFSET_INFINITY: assert isinstance(end, (int, long)) assert start <= end self._start_offset = start self._stop_offset = end self._last_record_start = -1 self._offset_of_last_split_point = -1 self._lock = threading.Lock() self._split_points_seen = 0 self._split_points_unclaimed_callback = None
[docs] def start_position(self): return self._start_offset
[docs] def stop_position(self): return self._stop_offset
@property def last_record_start(self): return self._last_record_start def _validate_record_start(self, record_start, split_point): # This function must only be called under the lock self.lock. if not self._lock.locked(): raise ValueError( 'This function must only be called under the lock self.lock.') if record_start < self._last_record_start: raise ValueError( 'Trying to return a record [starting at %d] which is before the ' 'last-returned record [starting at %d]' % (record_start, self._last_record_start)) if split_point: if (self._offset_of_last_split_point != -1 and record_start == self._offset_of_last_split_point): raise ValueError( 'Record at a split point has same offset as the previous split ' 'point: %d' % record_start) elif self._last_record_start == -1: raise ValueError( 'The first record [starting at %d] must be at a split point' % record_start) if (split_point and self._offset_of_last_split_point != -1 and record_start == self._offset_of_last_split_point): raise ValueError( 'Record at a split point has same offset as the previous split ' 'point: %d' % record_start) if not split_point and self._last_record_start == -1: raise ValueError( 'The first record [starting at %d] must be at a split point' % record_start)
[docs] def try_claim(self, record_start): with self._lock: self._validate_record_start(record_start, True) if record_start >= self.stop_position(): return False self._offset_of_last_split_point = record_start self._last_record_start = record_start self._split_points_seen += 1 return True
[docs] def set_current_position(self, record_start): with self._lock: self._validate_record_start(record_start, False) self._last_record_start = record_start
[docs] def try_split(self, split_offset): assert isinstance(split_offset, (int, long)) with self._lock: if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY: logging.debug('refusing to split %r at %d: stop position unspecified', self, split_offset) return if self._last_record_start == -1: logging.debug('Refusing to split %r at %d: unstarted', self, split_offset) return if split_offset <= self._last_record_start: logging.debug( 'Refusing to split %r at %d: already past proposed stop offset', self, split_offset) return if (split_offset < self.start_position() or split_offset >= self.stop_position()): logging.debug( 'Refusing to split %r at %d: proposed split position out of range', self, split_offset) return logging.debug('Agreeing to split %r at %d', self, split_offset) split_fraction = (float(split_offset - self._start_offset) / ( self._stop_offset - self._start_offset)) self._stop_offset = split_offset return self._stop_offset, split_fraction
[docs] def fraction_consumed(self): with self._lock: fraction = ((1.0 * (self._last_record_start - self.start_position()) / (self.stop_position() - self.start_position())) if self.stop_position() != self.start_position() else 0.0) # self.last_record_start may become larger than self.end_offset when # reading the records since any record that starts before the first 'split # point' at or after the defined 'stop offset' is considered to be within # the range of the OffsetRangeTracker. Hence fraction could be > 1. # self.last_record_start is initialized to -1, hence fraction may be < 0. # Bounding the to range [0, 1]. return max(0.0, min(1.0, fraction))
[docs] def position_at_fraction(self, fraction): if self.stop_position() == OffsetRangeTracker.OFFSET_INFINITY: raise Exception( 'get_position_for_fraction_consumed is not applicable for an ' 'unbounded range') return int(math.ceil(self.start_position() + fraction * ( self.stop_position() - self.start_position())))
[docs] def split_points(self): with self._lock: split_points_consumed = ( 0 if self._split_points_seen == 0 else self._split_points_seen - 1) split_points_unclaimed = ( self._split_points_unclaimed_callback(self.stop_position()) if self._split_points_unclaimed_callback else iobase.RangeTracker.SPLIT_POINTS_UNKNOWN) split_points_remaining = ( iobase.RangeTracker.SPLIT_POINTS_UNKNOWN if split_points_unclaimed == iobase.RangeTracker.SPLIT_POINTS_UNKNOWN else (split_points_unclaimed + 1)) return (split_points_consumed, split_points_remaining)
[docs] def set_split_points_unclaimed_callback(self, callback): self._split_points_unclaimed_callback = callback
[docs]class OrderedPositionRangeTracker(iobase.RangeTracker): """ An abstract base class for range trackers whose positions are comparable. Subclasses only need to implement the mapping from position ranges to and from the closed interval [0, 1]. """ UNSTARTED = object() def __init__(self, start_position=None, stop_position=None): self._start_position = start_position self._stop_position = stop_position self._lock = threading.Lock() self._last_claim = self.UNSTARTED
[docs] def start_position(self): return self._start_position
[docs] def stop_position(self): with self._lock: return self._stop_position
[docs] def try_claim(self, position): with self._lock: if self._last_claim is not self.UNSTARTED and position < self._last_claim: raise ValueError( "Positions must be claimed in order: " "claim '%s' attempted after claim '%s'" % ( position, self._last_claim)) elif self._start_position is not None and position < self._start_position: raise ValueError("Claim '%s' is before start '%s'" % ( position, self._start_position)) if self._stop_position is None or position < self._stop_position: self._last_claim = position return True else: return False
[docs] def position_at_fraction(self, fraction): return self.fraction_to_position( fraction, self._start_position, self._stop_position)
[docs] def try_split(self, position): with self._lock: if ((self._stop_position is not None and position >= self._stop_position) or (self._start_position is not None and position <= self._start_position)): raise ValueError("Split at '%s' not in range %s" % ( position, [self._start_position, self._stop_position])) if self._last_claim is self.UNSTARTED or self._last_claim < position: fraction = self.position_to_fraction( position, start=self._start_position, end=self._stop_position) self._stop_position = position return position, fraction else: return None
[docs] def fraction_consumed(self): if self._last_claim is self.UNSTARTED: return 0 else: return self.position_to_fraction( self._last_claim, self._start_position, self._stop_position)
[docs] def position_to_fraction(self, pos, start, end): """ Converts a position `pos` betweeen `start` and `end` (inclusive) to a fraction between 0 and 1. """ raise NotImplementedError
[docs] def fraction_to_position(self, fraction, start, end): """ Converts a fraction between 0 and 1 to a position between start and end. """ raise NotImplementedError
[docs]class UnsplittableRangeTracker(iobase.RangeTracker): """A RangeTracker that always ignores split requests. This can be used to make a given :class:`~apache_beam.io.iobase.RangeTracker` object unsplittable by ignoring all calls to :meth:`.try_split()`. All other calls will be delegated to the given :class:`~apache_beam.io.iobase.RangeTracker`. """ def __init__(self, range_tracker): """Initializes UnsplittableRangeTracker. Args: range_tracker (~apache_beam.io.iobase.RangeTracker): a :class:`~apache_beam.io.iobase.RangeTracker` to which all method calls expect calls to :meth:`.try_split()` will be delegated. """ assert isinstance(range_tracker, iobase.RangeTracker) self._range_tracker = range_tracker
[docs] def start_position(self): return self._range_tracker.start_position()
[docs] def stop_position(self): return self._range_tracker.stop_position()
[docs] def position_at_fraction(self, fraction): return self._range_tracker.position_at_fraction(fraction)
[docs] def try_claim(self, position): return self._range_tracker.try_claim(position)
[docs] def try_split(self, position): return None
[docs] def set_current_position(self, position): self._range_tracker.set_current_position(position)
[docs] def fraction_consumed(self): return self._range_tracker.fraction_consumed()
[docs] def split_points(self): # An unsplittable range only contains a single split point. return (0, 1)
[docs] def set_split_points_unclaimed_callback(self, callback): self._range_tracker.set_split_points_unclaimed_callback(callback)
[docs]class LexicographicKeyRangeTracker(OrderedPositionRangeTracker): """ A range tracker that tracks progress through a lexicographically ordered keyspace of strings. """ @classmethod
[docs] def fraction_to_position(cls, fraction, start=None, end=None): """ Linearly interpolates a key that is lexicographically fraction of the way between start and end. """ assert 0 <= fraction <= 1, fraction if start is None: start = '' if fraction == 1: return end elif fraction == 0: return start else: if not end: common_prefix_len = len(start) - len(start.lstrip('\xFF')) else: for ix, (s, e) in enumerate(zip(start, end)): if s != e: common_prefix_len = ix break else: common_prefix_len = min(len(start), len(end)) # Convert the relative precision of fraction (~53 bits) to an absolute # precision needed to represent values between start and end distinctly. prec = common_prefix_len + int(-math.log(fraction, 256)) + 7 istart = cls._string_to_int(start, prec) iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8) ikey = istart + int((iend - istart) * fraction) # Could be equal due to rounding. # Adjust to ensure we never return the actual start and end # unless fraction is exatly 0 or 1. if ikey == istart: ikey += 1 elif ikey == iend: ikey -= 1 return cls._string_from_int(ikey, prec).rstrip('\0')
@classmethod
[docs] def position_to_fraction(cls, key, start=None, end=None): """ Returns the fraction of keys in the range [start, end) that are less than the given key. """ if not key: return 0 if start is None: start = '' prec = len(start) + 7 if key.startswith(start): # Higher absolute precision needed for very small values of fixed # relative position. prec = max(prec, len(key) - len(key[len(start):].strip('\0')) + 7) istart = cls._string_to_int(start, prec) ikey = cls._string_to_int(key, prec) iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8) return float(ikey - istart) / (iend - istart)
@staticmethod def _string_to_int(s, prec): """ Returns int(256**prec * f) where f is the fraction represented by interpreting '.' + s as a base-256 floating point number. """ if not s: return 0 elif len(s) < prec: s += '\0' * (prec - len(s)) else: s = s[:prec] return int(s.encode('hex'), 16) @staticmethod def _string_from_int(i, prec): """ Inverse of _string_to_int. """ h = '%x' % i return ('0' * (2 * prec - len(h)) + h).decode('hex')