public abstract static class OffsetBasedSource.OffsetBasedReader<T> extends BoundedSource.BoundedReader<T>
Source.Reader
that implements code common to readers of all OffsetBasedSource
s.
Subclasses have to implement:
startImpl()
and advanceImpl()
for reading the first or
subsequent records.
Source.Reader.getCurrent()
, getCurrentOffset()
, and optionally isAtSplitPoint()
and BoundedSource.BoundedReader.getCurrentTimestamp()
to access properties of the last record
successfully read by startImpl()
or advanceImpl()
.
SPLIT_POINTS_UNKNOWN
Constructor and Description |
---|
OffsetBasedReader(OffsetBasedSource<T> source) |
Modifier and Type | Method and Description |
---|---|
boolean |
advance()
Advances the reader to the next valid record.
|
protected abstract boolean |
advanceImpl()
Advances to the next record and returns
true , or returns false if there is no next
record. |
boolean |
allowsDynamicSplitting()
Whether this reader should allow dynamic splitting of the offset ranges.
|
protected abstract long |
getCurrentOffset()
Returns the starting offset of the
current record ,
which has been read by the last successful Source.Reader.start() or Source.Reader.advance() call. |
OffsetBasedSource<T> |
getCurrentSource()
Returns a
Source describing the same input that this Reader currently reads
(including items already read). |
java.lang.Double |
getFractionConsumed()
Returns a value in [0, 1] representing approximately what fraction of the
current source this reader has read so far, or null if such an
estimate is not available. |
long |
getSplitPointsConsumed()
Returns the total amount of parallelism in the consumed (returned and processed) range of
this reader's current
BoundedSource (as would be returned by BoundedSource.BoundedReader.getCurrentSource() ). |
long |
getSplitPointsRemaining()
Returns the total amount of parallelism in the unprocessed part of this reader's current
BoundedSource (as would be returned by BoundedSource.BoundedReader.getCurrentSource() ). |
protected boolean |
isAtSplitPoint()
Returns whether the current record is at a split point (i.e., whether the current record
would be the first record to be read by a source with a specified start offset of
getCurrentOffset() ). |
boolean |
isDone()
|
boolean |
isStarted()
Returns true if there has been a call to
start() . |
OffsetBasedSource<T> |
splitAtFraction(double fraction)
Tells the reader to narrow the range of the input it's going to read and give up the
remainder, so that the new range would contain approximately the given fraction of the amount
of data in the current range.
|
boolean |
start()
Initializes the reader and advances the reader to the first record.
|
protected abstract boolean |
startImpl()
Initializes the
OffsetBasedSource.OffsetBasedReader and advances to the first record,
returning true if there is a record available to be read. |
getCurrentTimestamp
close, getCurrent
public OffsetBasedReader(OffsetBasedSource<T> source)
source
- the OffsetBasedSource
to be read by the current reader.public final boolean isDone()
public final boolean isStarted()
start()
.protected abstract long getCurrentOffset() throws java.util.NoSuchElementException
current record
,
which has been read by the last successful Source.Reader.start()
or Source.Reader.advance()
call.
If no such call has been made yet, the return value is unspecified.
See RangeTracker
for description of offset semantics.
java.util.NoSuchElementException
protected boolean isAtSplitPoint() throws java.util.NoSuchElementException
getCurrentOffset()
).
See detailed documentation about split points in RangeTracker
.
java.util.NoSuchElementException
public final boolean start() throws java.io.IOException
Source.Reader
This method should be called exactly once. The invocation should occur prior to calling
Source.Reader.advance()
or Source.Reader.getCurrent()
. This method may perform expensive operations that
are needed to initialize the reader.
start
in class Source.Reader<T>
true
if a record was read, false
if there is no more input available.java.io.IOException
public final boolean advance() throws java.io.IOException
Source.Reader
It is an error to call this without having called Source.Reader.start()
first.
advance
in class Source.Reader<T>
true
if a record was read, false
if there is no more input available.java.io.IOException
protected abstract boolean startImpl() throws java.io.IOException
OffsetBasedSource.OffsetBasedReader
and advances to the first record,
returning true
if there is a record available to be read. This method will be invoked
exactly once and may perform expensive setup operations that are needed to initialize the
reader.
This function is the OffsetBasedReader
implementation of BoundedReader#start
. The key difference is that the implementor can ignore the possibility
that it should no longer produce the first record, either because it has exceeded the
original endOffset
assigned to the reader, or because a concurrent call to splitAtFraction(double)
has changed the source to shrink the offset range being read.
java.io.IOException
BoundedReader#start
protected abstract boolean advanceImpl() throws java.io.IOException
true
, or returns false if there is no next
record.
This function is the OffsetBasedReader
implementation of BoundedReader#advance
. The key difference is that the implementor can ignore the possibility
that it should no longer produce the next record, either because it has exceeded the original
endOffset
assigned to the reader, or because a concurrent call to splitAtFraction(double)
has changed the source to shrink the offset range being read.
java.io.IOException
BoundedReader#advance
public OffsetBasedSource<T> getCurrentSource()
BoundedSource.BoundedReader
Source
describing the same input that this Reader
currently reads
(including items already read).
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after
a successful BoundedSource.BoundedReader.splitAtFraction(double)
call.
Remember that Source
objects must always be immutable. However, the return value
of this function may be affected by dynamic work rebalancing, happening asynchronously via
BoundedSource.BoundedReader.splitAtFraction(double)
, meaning it can return a different Source
object. However, the returned object itself will still itself be immutable. Callers
must take care not to rely on properties of the returned source that may be asynchronously
changed as a result of this process (e.g. do not cache an end offset when reading a file).
For convenience, subclasses should usually return the most concrete subclass of Source
possible. In practice, the implementation of this method should nearly always be one
of the following:
BoundedSource.BoundedReader.getCurrentSource()
: delegate to base class. In this case, it is almost always an error
for the subclass to maintain its own copy of the source.
public FooReader(FooSource<T> source) {
super(source);
}
public FooSource<T> getCurrentSource() {
return (FooSource<T>)super.getCurrentSource();
}
private final FooSource<T> source;
public FooReader(FooSource<T> source) {
this.source = source;
}
public FooSource<T> getCurrentSource() {
return source;
}
BoundedSource.BoundedReader
that explicitly supports dynamic work rebalancing:
maintain a variable pointing to an immutable source object, and protect it with
synchronization.
private FooSource<T> source;
public FooReader(FooSource<T> source) {
this.source = source;
}
public synchronized FooSource<T> getCurrentSource() {
return source;
}
public synchronized FooSource<T> splitAtFraction(double fraction) {
...
FooSource<T> primary = ...;
FooSource<T> residual = ...;
this.source = primary;
return residual;
}
getCurrentSource
in class BoundedSource.BoundedReader<T>
public java.lang.Double getFractionConsumed()
BoundedSource.BoundedReader
current source
this reader has read so far, or null
if such an
estimate is not available.
It is recommended that this method should satisfy the following properties:
Source.Reader.start()
call.
Source.Reader.start()
or Source.Reader.advance()
call that returns false.
By default, returns null to indicate that this cannot be estimated.
BoundedSource.BoundedReader.splitAtFraction(double)
is implemented, this method can be called concurrently to other
methods (including itself), and it is therefore critical for it to be implemented in a
thread-safe way.getFractionConsumed
in class BoundedSource.BoundedReader<T>
public long getSplitPointsConsumed()
BoundedSource.BoundedReader
BoundedSource
(as would be returned by BoundedSource.BoundedReader.getCurrentSource()
). This corresponds to all split point records (see RangeTracker
)
returned by this reader, excluding the last split point returned if the reader is
not finished.
Consider the following examples: (1) An input that can be read in parallel down to the
individual records, such as CountingSource.upTo(long)
, is called "perfectly splittable".
(2) a "block-compressed" file format such as AvroIO
, in which a block of records has
to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
input such as a cursor in a database.
reader
that is unstarted (aka, has never had a call to Source.Reader.start()
) has a consumed parallelism of 0. This condition holds independent of whether
the input is splittable.
reader
that has only returned its first element (aka, has
never had a call to Source.Reader.advance()
) has a consumed parallelism of 0: the first
element is the current element and is still being processed. This condition holds
independent of whether the input is splittable.
Source.Reader.start()
returned false), the consumed
parallelism is 0. This condition holds independent of whether the input is splittable.
Source.Reader.start()
returned true and
a call to Source.Reader.advance()
has returned false), the value returned must be at least 1
and should equal the total parallelism in the source.
Source.Reader.advance()
returned false,
at which point it becomes 1.
A reader that is implemented using a RangeTracker
is encouraged to use the range
tracker's ability to count split points to implement this method. See OffsetBasedSource.OffsetBasedReader
and OffsetRangeTracker
for an example.
Defaults to BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as
unknown.
BoundedSource.BoundedReader
for information about thread safety.getSplitPointsConsumed
in class BoundedSource.BoundedReader<T>
BoundedSource.BoundedReader.getSplitPointsRemaining()
public long getSplitPointsRemaining()
BoundedSource.BoundedReader
BoundedSource
(as would be returned by BoundedSource.BoundedReader.getCurrentSource()
). This corresponds
to all unprocessed split point records (see RangeTracker
), including the last split
point returned, in the remainder part of the source.
This function should be implemented only in addition to BoundedSource.BoundedReader.getSplitPointsConsumed()
and only if an exact value can be returned.
Consider the following examples: (1) An input that can be read in parallel down to the
individual records, such as CountingSource.upTo(long)
, is called "perfectly splittable".
(2) a "block-compressed" file format such as AvroIO
, in which a block of records has
to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
input such as a cursor in a database.
Assume for examples (1) and (2) that the number of records or blocks remaining is known:
reader
for which the last call to Source.Reader.start()
or Source.Reader.advance()
has returned true should should not return 0, because this reader itself
represents parallelism at least 1. This condition holds independent of whether the
input is splittable.
Source.Reader.start()
or Source.Reader.advance()
) has returned false
should return a value of 0. This condition holds independent of whether the input is
splittable.
Defaults to BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as
unknown.
BoundedSource.BoundedReader
for information about thread safety.getSplitPointsRemaining
in class BoundedSource.BoundedReader<T>
BoundedSource.BoundedReader.getSplitPointsConsumed()
public boolean allowsDynamicSplitting()
True by default. Override this to return false if the reader cannot support dynamic
splitting correctly. If this returns false, splitAtFraction(double)
will
refuse all split requests.
public final OffsetBasedSource<T> splitAtFraction(double fraction)
BoundedSource.BoundedReader
Returns a BoundedSource
representing the remainder.
BoundedSource<T> initial = reader.getCurrentSource();
BoundedSource<T> residual = reader.splitAtFraction(fraction);
BoundedSource<T> primary = reader.getCurrentSource();
This method should return null
if the split cannot be performed for this fraction
while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file
should return null
if it is already past the position in its range corresponding to
the given fraction. In this case, the method MUST have no effect (the reader must behave as
if the method hadn't been called at all).
It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.
It is incorrect to make both this method and Source.Reader.start()
/Source.Reader.advance()
synchronized
, because those methods can perform blocking operations, and then this method
would have to wait for those calls to complete.
RangeTracker
makes it easy to implement this method
safely and correctly.
By default, returns null to indicate that splitting is not possible.
splitAtFraction
in class BoundedSource.BoundedReader<T>