#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""`iobase.RestrictionTracker` implementations provided with Apache Beam."""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
from builtins import object
from typing import Tuple
from apache_beam.io.iobase import RestrictionProgress
from apache_beam.io.iobase import RestrictionTracker
from apache_beam.io.range_trackers import OffsetRangeTracker
[docs]class OffsetRange(object):
def __init__(self, start, stop):
if start > stop:
raise ValueError(
'Start offset must be not be larger than the stop offset. '
'Received %d and %d respectively.' % (start, stop))
self.start = start
self.stop = stop
def __eq__(self, other):
if not isinstance(other, OffsetRange):
return False
return self.start == other.start and self.stop == other.stop
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((type(self), self.start, self.stop))
def __repr__(self):
return 'OffsetRange(start=%s, stop=%s)' % (self.start, self.stop)
[docs] def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
current_split_start = self.start
max_split_size = max(
desired_num_offsets_per_split, min_num_offsets_per_split)
while current_split_start < self.stop:
current_split_stop = min(current_split_start + max_split_size, self.stop)
remaining = self.stop - current_split_stop
# Avoiding a small split at the end.
if (remaining < desired_num_offsets_per_split // 4 or
remaining < min_num_offsets_per_split):
current_split_stop = self.stop
yield OffsetRange(current_split_start, current_split_stop)
current_split_start = current_split_stop
[docs] def split_at(self, split_pos):
# type: (...) -> Tuple[OffsetRange, OffsetRange]
return OffsetRange(self.start, split_pos), OffsetRange(split_pos, self.stop)
[docs] def new_tracker(self):
return OffsetRangeTracker(self.start, self.stop)
[docs] def size(self):
return self.stop - self.start
[docs]class OffsetRestrictionTracker(RestrictionTracker):
"""An `iobase.RestrictionTracker` implementations for an offset range.
Offset range is represented as OffsetRange.
"""
def __init__(self, offset_range):
# type: (OffsetRange) -> None
assert isinstance(offset_range, OffsetRange), offset_range
self._range = offset_range
self._current_position = None
self._last_claim_attempt = None
self._checkpointed = False
[docs] def check_done(self):
if (self._range.start != self._range.stop and
(self._last_claim_attempt is None or
self._last_claim_attempt < self._range.stop - 1)):
raise ValueError(
'OffsetRestrictionTracker is not done since work in range [%s, %s) '
'has not been claimed.' % (
self._last_claim_attempt
if self._last_claim_attempt is not None else self._range.start,
self._range.stop))
[docs] def current_restriction(self):
return self._range
[docs] def current_progress(self):
# type: () -> RestrictionProgress
if self._current_position is None:
fraction = 0.0
elif self._range.stop == self._range.start:
# If self._current_position is not None, we must be done.
fraction = 1.0
else:
fraction = (
float(self._current_position - self._range.start) /
(self._range.stop - self._range.start))
return RestrictionProgress(fraction=fraction)
[docs] def start_position(self):
return self._range.start
[docs] def stop_position(self):
return self._range.stop
[docs] def try_claim(self, position):
if (self._last_claim_attempt is not None and
position <= self._last_claim_attempt):
raise ValueError(
'Positions claimed should strictly increase. Trying to claim '
'position %d while last claim attempt was %d.' %
(position, self._last_claim_attempt))
self._last_claim_attempt = position
if position < self._range.start:
raise ValueError(
'Position to be claimed cannot be smaller than the start position '
'of the range. Tried to claim position %r for the range [%r, %r)' %
(position, self._range.start, self._range.stop))
if self._range.start <= position < self._range.stop:
self._current_position = position
return True
return False
[docs] def try_split(self, fraction_of_remainder):
if not self._checkpointed:
if self._last_claim_attempt is None:
cur = self._range.start - 1
else:
cur = self._last_claim_attempt
split_point = (
cur + int(max(1, (self._range.stop - cur) * fraction_of_remainder)))
if split_point < self._range.stop:
if fraction_of_remainder == 0:
self._checkpointed = True
self._range, residual_range = self._range.split_at(split_point)
return self._range, residual_range
[docs] def is_bounded(self):
return True
[docs]class UnsplittableRestrictionTracker(RestrictionTracker):
"""An `iobase.RestrictionTracker` that wraps another but does not split."""
def __init__(self, underling_tracker):
self._underling_tracker = underling_tracker
[docs] def try_split(self, fraction_of_remainder):
return False
# __getattribute__ is used rather than __getattr__ to override the
# stubs in the baseclass.
def __getattribute__(self, name):
if name.startswith('_') or name in ('try_split', ):
return super(UnsplittableRestrictionTracker, self).__getattribute__(name)
else:
return getattr(self._underling_tracker, name)