PositionT
- Type of positions used by the source to define ranges and identify records.public interface RangeTracker<PositionT>
RangeTracker
is a thread-safe helper object for implementing dynamic work rebalancing
in position-based BoundedSource.BoundedReader
subclasses.
RangeTracker
interface should not be used per se - all users should use its
subclasses directly. We declare it here because all subclasses have roughly the same interface
and the same properties, to centralize the documentation. Currently we provide one implementation
- OffsetRangeTracker
.
In case a record occupies a range of positions in the source, the most important thing about the record is the position where it starts.
Defining the semantics of positions for a source is entirely up to the source class, however the chosen definitions have to obey certain properties in order to make it possible to correctly split the source into parts, including dynamic splitting. Two main aspects need to be defined:
[A, B)
.
[A, B)
should not require reading all data before A
.
The sections below explain exactly what properties these definitions must satisfy, and how to
use a RangeTracker
with a properly defined source.
[A, B)
and records from [B, C)
should give the same records as reading from
[A, C)
, where A <= B <= C
. This property ensures that no matter how a range of
positions is split into arbitrarily many sub-ranges, the total set of records described by them
stays the same.
The other important property is how the source's range relates to positions of records in the source. In many sources each record can be identified by a unique starting position. In this case:
[A, B)
must have starting positions in this range.
[A, B)
" as "read from the first record starting at or
after A, up to but not including the first record starting at or after B".
Some examples of such sources include reading lines or CSV from a text file, reading keys and values from a BigTable, etc.
The concept of split points allows to extend the definitions for dealing with sources where some records cannot be identified by a unique starting position.
In all cases, all records returned by a source [A, B)
must start at or after
A
.
Some sources may have records that are not directly addressable. For example, imagine a file format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but records within the block cannot be directly addressed without decompressing the block. Let us refer to this hypothetical format as CBF (Compressed Blocks Format).
Many such formats can still satisfy the associativity property. For example, in CBF, reading
[A, B)
can mean "read all the records in all blocks whose starting offset is in [A, B)
".
To support such complex formats, we introduce the notion of split points. We say that a
record is a split point if there exists a position A
such that the record is the first
one to be returned when reading the range [A, infinity)
. In CBF, the only split points
would be the first records in each block.
Split points allow us to define the meaning of a record's position and a source's range in all cases:
A
such that reading a source with the range [A, infinity)
returns this record;
[A, B)
must return records starting from the first split point
at or after A
, up to but not including the first split point at or after B
.
In particular, this means that the first record returned by a source MUST always be a split
point.
Dynamic splitting can happen only at unconsumed positions. If the reader just returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or beyond, as otherwise that record could be read twice (by the current reader and by a reader of the task starting at 43).
OffsetRangeTracker
to support dynamically splitting a
source with integer positions (offsets).
class MyReader implements BoundedReader<Foo> {
private MySource currentSource;
private final OffsetRangeTracker tracker = new OffsetRangeTracker();
...
MyReader(MySource source) {
this.currentSource = source;
this.tracker = new MyRangeTracker<>(source.getStartOffset(), source.getEndOffset())
}
...
boolean start() {
... (general logic for locating the first record) ...
if (!tracker.tryReturnRecordAt(true, recordStartOffset)) return false;
... (any logic that depends on the record being returned, e.g. counting returned records)
return true;
}
boolean advance() {
... (general logic for locating the next record) ...
if (!tracker.tryReturnRecordAt(isAtSplitPoint, recordStartOffset)) return false;
... (any logic that depends on the record being returned, e.g. counting returned records)
return true;
}
double getFractionConsumed() {
return tracker.getFractionConsumed();
}
}
BoundedSource.BoundedReader
,
follow the pattern described above.
When using this class to protect iteration in the hasNext()/next()
model, consider the
record consumed when hasNext()
is about to return true, rather than when next()
is called, because hasNext()
returning true is promising the caller that next()
will have an element to return - so trySplitAtPosition(PositionT)
must not split the range in a way
that would make the record promised by hasNext()
belong to a different range.
Also note that implementations of hasNext()
need to ensure that they call tryReturnRecordAt(boolean, PositionT)
only once even if hasNext()
is called repeatedly, due to the
requirement on uniqueness of split point positions.
Modifier and Type | Method and Description |
---|---|
double |
getFractionConsumed()
Returns the approximate fraction of positions in the source that have been consumed by
successful
tryReturnRecordAt(boolean, PositionT) calls, or 0.0 if no such calls have happened. |
PositionT |
getStartPosition()
Returns the starting position of the current range, inclusive.
|
PositionT |
getStopPosition()
Returns the ending position of the current range, exclusive.
|
boolean |
tryReturnRecordAt(boolean isAtSplitPoint,
PositionT recordStart)
Atomically determines whether a record at the given position can be returned and updates
internal state.
|
boolean |
trySplitAtPosition(PositionT splitPosition)
Atomically splits the current range [
getStartPosition() , getStopPosition() ) into
a "primary" part [getStartPosition() , splitPosition ) and a "residual" part
[splitPosition , getStopPosition() ), assuming the current last-consumed position
is within [getStartPosition() , splitPosition) (i.e., splitPosition has not been
consumed yet). |
PositionT getStartPosition()
PositionT getStopPosition()
boolean tryReturnRecordAt(boolean isAtSplitPoint, PositionT recordStart)
isAtSplitPoint
is true
, and recordStart
is outside the current
range, returns false
;
recordStart
and returns true
.
This method MUST be called on all split point records. It may be called on every record.
boolean trySplitAtPosition(PositionT splitPosition)
getStartPosition()
, getStopPosition()
) into
a "primary" part [getStartPosition()
, splitPosition
) and a "residual" part
[splitPosition
, getStopPosition()
), assuming the current last-consumed position
is within [getStartPosition()
, splitPosition) (i.e., splitPosition
has not been
consumed yet).
Updates the current range to be the primary and returns true
. This means that all
further calls on the current object will interpret their arguments relative to the primary
range.
If the split position has already been consumed, or if no tryReturnRecordAt(boolean, PositionT)
call
was made yet, returns false
. The second condition is to prevent dynamic splitting
during reader start-up.
double getFractionConsumed()
tryReturnRecordAt(boolean, PositionT)
calls, or 0.0 if no such calls have happened.