Class OffsetBasedSource.OffsetBasedReader<T>
- All Implemented Interfaces:
AutoCloseable
- Direct Known Subclasses:
FileBasedSource.FileBasedReader
- Enclosing class:
OffsetBasedSource<T>
Source.Reader that implements code common to readers of all OffsetBasedSources.
Subclasses have to implement:
- The methods
startImpl()andadvanceImpl()for reading the first or subsequent records. - The methods
Source.Reader.getCurrent(),getCurrentOffset(), and optionallyisAtSplitPoint()andBoundedSource.BoundedReader.getCurrentTimestamp()to access properties of the last record successfully read bystartImpl()oradvanceImpl().
-
Field Summary
Fields inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
SPLIT_POINTS_UNKNOWN -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal booleanadvance()Advances the reader to the next valid record.protected abstract booleanAdvances to the next record and returnstrue, or returns false if there is no next record.booleanWhether this reader should allow dynamic splitting of the offset ranges.protected abstract longReturns the starting offset of thecurrent record, which has been read by the last successfulSource.Reader.start()orSource.Reader.advance()call.Returns aSourcedescribing the same input that thisReadercurrently reads (including items already read).Returns a value in [0, 1] representing approximately what fraction of thecurrent sourcethis reader has read so far, ornullif such an estimate is not available.longReturns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource(as would be returned byBoundedSource.BoundedReader.getCurrentSource()).longReturns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource(as would be returned byBoundedSource.BoundedReader.getCurrentSource()).protected booleanReturns whether the current record is at a split point (i.e., whether the current record would be the first record to be read by a source with a specified start offset ofgetCurrentOffset()).final booleanisDone()final booleanReturns true if there has been a call tostart().final OffsetBasedSource<T> splitAtFraction(double fraction) Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.final booleanstart()Initializes the reader and advances the reader to the first record.protected abstract booleanInitializes theOffsetBasedSource.OffsetBasedReaderand advances to the first record, returningtrueif there is a record available to be read.Methods inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
getCurrentTimestampMethods inherited from class org.apache.beam.sdk.io.Source.Reader
close, getCurrent
-
Constructor Details
-
OffsetBasedReader
- Parameters:
source- theOffsetBasedSourceto be read by the current reader.
-
-
Method Details
-
isDone
public final boolean isDone() -
isStarted
public final boolean isStarted()Returns true if there has been a call tostart(). -
getCurrentOffset
Returns the starting offset of thecurrent record, which has been read by the last successfulSource.Reader.start()orSource.Reader.advance()call.If no such call has been made yet, the return value is unspecified.
See
RangeTrackerfor description of offset semantics.- Throws:
NoSuchElementException
-
isAtSplitPoint
Returns whether the current record is at a split point (i.e., whether the current record would be the first record to be read by a source with a specified start offset ofgetCurrentOffset()).See detailed documentation about split points in
RangeTracker.- Throws:
NoSuchElementException
-
start
Description copied from class:Source.ReaderInitializes the reader and advances the reader to the first record.This method should be called exactly once. The invocation should occur prior to calling
Source.Reader.advance()orSource.Reader.getCurrent(). This method may perform expensive operations that are needed to initialize the reader.- Specified by:
startin classSource.Reader<T>- Returns:
trueif a record was read,falseif there is no more input available.- Throws:
IOException
-
advance
Description copied from class:Source.ReaderAdvances the reader to the next valid record.It is an error to call this without having called
Source.Reader.start()first.- Specified by:
advancein classSource.Reader<T>- Returns:
trueif a record was read,falseif there is no more input available.- Throws:
IOException
-
startImpl
Initializes theOffsetBasedSource.OffsetBasedReaderand advances to the first record, returningtrueif there is a record available to be read. This method will be invoked exactly once and may perform expensive setup operations that are needed to initialize the reader.This function is the
OffsetBasedReaderimplementation ofSource.Reader.start(). The key difference is that the implementor can ignore the possibility that it should no longer produce the first record, either because it has exceeded the originalendOffsetassigned to the reader, or because a concurrent call tosplitAtFraction(double)has changed the source to shrink the offset range being read.- Throws:
IOException- See Also:
-
advanceImpl
Advances to the next record and returnstrue, or returns false if there is no next record.This function is the
OffsetBasedReaderimplementation ofSource.Reader.advance(). The key difference is that the implementor can ignore the possibility that it should no longer produce the next record, either because it has exceeded the originalendOffsetassigned to the reader, or because a concurrent call tosplitAtFraction(double)has changed the source to shrink the offset range being read.- Throws:
IOException- See Also:
-
getCurrentSource
Description copied from class:BoundedSource.BoundedReaderReturns aSourcedescribing the same input that thisReadercurrently reads (including items already read).Usage
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful
BoundedSource.BoundedReader.splitAtFraction(double)call.Mutability and thread safety
Remember that
Sourceobjects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously viaBoundedSource.BoundedReader.splitAtFraction(double), meaning it can return a differentSourceobject. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).Implementation
For convenience, subclasses should usually return the most concrete subclass of
Sourcepossible. In practice, the implementation of this method should nearly always be one of the following:- Source that inherits from a base class that already implements
BoundedSource.BoundedReader.getCurrentSource(): delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.public FooReader(FooSource<T> source) { super(source); } public FooSource<T> getCurrentSource() { return (FooSource<T>)super.getCurrentSource(); } - Source that does not support dynamic work rebalancing: return a private final variable.
private final FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public FooSource<T> getCurrentSource() { return source; } BoundedSource.BoundedReaderthat explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.private FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public synchronized FooSource<T> getCurrentSource() { return source; } public synchronized FooSource<T> splitAtFraction(double fraction) { ... FooSource<T> primary = ...; FooSource<T> residual = ...; this.source = primary; return residual; }
- Specified by:
getCurrentSourcein classBoundedSource.BoundedReader<T>
- Source that inherits from a base class that already implements
-
getFractionConsumed
Description copied from class:BoundedSource.BoundedReaderReturns a value in [0, 1] representing approximately what fraction of thecurrent sourcethis reader has read so far, ornullif such an estimate is not available.It is recommended that this method should satisfy the following properties:
- Should return 0 before the
Source.Reader.start()call. - Should return 1 after a
Source.Reader.start()orSource.Reader.advance()call that returns false. - The returned values should be non-decreasing (though they don't have to be unique).
By default, returns null to indicate that this cannot be estimated.
Thread safety
IfBoundedSource.BoundedReader.splitAtFraction(double)is implemented, this method can be called concurrently to other methods (including itself), and it is therefore critical for it to be implemented in a thread-safe way.- Overrides:
getFractionConsumedin classBoundedSource.BoundedReader<T>
- Should return 0 before the
-
getSplitPointsConsumed
public long getSplitPointsConsumed()Description copied from class:BoundedSource.BoundedReaderReturns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource(as would be returned byBoundedSource.BoundedReader.getCurrentSource()). This corresponds to all split point records (seeRangeTracker) returned by this reader, excluding the last split point returned if the reader is not finished.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.invalid reference
AvroIO- Any
readerthat is unstarted (aka, has never had a call toSource.Reader.start()) has a consumed parallelism of 0. This condition holds independent of whether the input is splittable. - Any
readerthat has only returned its first element (aka, has never had a call toSource.Reader.advance()) has a consumed parallelism of 0: the first element is the current element and is still being processed. This condition holds independent of whether the input is splittable. - For an empty reader (in which the call to
Source.Reader.start()returned false), the consumed parallelism is 0. This condition holds independent of whether the input is splittable. - For a non-empty, finished reader (in which the call to
Source.Reader.start()returned true and a call toSource.Reader.advance()has returned false), the value returned must be at least 1 and should equal the total parallelism in the source. - For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 29. When finished, the consumed parallelism should be 50.
- For example (2): In a block-compressed value consisting of 5 blocks, the value should stay at 0 until the first record of the second block is returned; stay at 1 until the first record of the third block is returned, etc. Only once the end-of-file is reached then the fifth block has been consumed and the value should stay at 5.
- For example (3): For any non-empty unsplittable input, the consumed parallelism is 0
until the reader is finished (because the last call to
Source.Reader.advance()returned false, at which point it becomes 1.
A reader that is implemented using a
RangeTrackeris encouraged to use the range tracker's ability to count split points to implement this method. SeeOffsetBasedSource.OffsetBasedReaderandOffsetRangeTrackerfor an example.Defaults to
BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReaderfor information about thread safety.- Overrides:
getSplitPointsConsumedin classBoundedSource.BoundedReader<T>- See Also:
- Any
-
getSplitPointsRemaining
public long getSplitPointsRemaining()Description copied from class:BoundedSource.BoundedReaderReturns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource(as would be returned byBoundedSource.BoundedReader.getCurrentSource()). This corresponds to all unprocessed split point records (seeRangeTracker), including the last split point returned, in the remainder part of the source.This function should be implemented only in addition to
BoundedSource.BoundedReader.getSplitPointsConsumed()and only if an exact value can be returned.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.invalid reference
AvroIOAssume for examples (1) and (2) that the number of records or blocks remaining is known:
- Any
readerfor which the last call toSource.Reader.start()orSource.Reader.advance()has returned true should should not return 0, because this reader itself represents parallelism at least 1. This condition holds independent of whether the input is splittable. - A finished reader (for which
Source.Reader.start()orSource.Reader.advance()) has returned false should return a value of 0. This condition holds independent of whether the input is splittable. - For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total number of records is known.
- For example 2: After returning a record in block 3 in a block-compressed file consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in parallel by new readers produced via dynamic work rebalancing, while the current reader continues processing block 3) if the total number of blocks is known.
- For example (3): a reader for any non-empty unsplittable input, should return 1 until it is finished, at which point it should return 0.
- For any reader: After returning the last split point in a file (e.g., the last record in example (1), the first record in the last block for example (2), or the first record in the file for example (3), this value should be 1: apart from the current task, no additional remainder can be split off.
Defaults to
BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReaderfor information about thread safety.- Overrides:
getSplitPointsRemainingin classBoundedSource.BoundedReader<T>- See Also:
- Any
-
allowsDynamicSplitting
public boolean allowsDynamicSplitting()Whether this reader should allow dynamic splitting of the offset ranges.True by default. Override this to return false if the reader cannot support dynamic splitting correctly. If this returns false,
splitAtFraction(double)will refuse all split requests. -
splitAtFraction
Description copied from class:BoundedSource.BoundedReaderTells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.Returns a
BoundedSourcerepresenting the remainder.Detailed description
Assuming the following sequence of calls:BoundedSource<T> initial = reader.getCurrentSource(); BoundedSource<T> residual = reader.splitAtFraction(fraction); BoundedSource<T> primary = reader.getCurrentSource();- The "primary" and "residual" sources, when read, should together cover the same set of records as "initial".
- The current reader should continue to be in a valid state, and continuing to read from it should, together with the records it already read, yield the same records as would have been read by "primary".
- The amount of data read by "primary" should ideally represent approximately the given fraction of the amount of data read by "initial".
This method should return
nullif the split cannot be performed for this fraction while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file should returnnullif it is already past the position in its range corresponding to the given fraction. In this case, the method MUST have no effect (the reader must behave as if the method hadn't been called at all).Statefulness
Since this method (if successful) affects the reader's source, in subsequent invocations "fraction" should be interpreted relative to the new current source.Thread safety and blocking
This method will be called concurrently to other methods (however there will not be multiple concurrent invocations of this method itself), and it is critical for it to be implemented in a thread-safe way (otherwise data loss is possible).It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.
It is incorrect to make both this method and
Source.Reader.start()/Source.Reader.advance()synchronized, because those methods can perform blocking operations, and then this method would have to wait for those calls to complete.RangeTrackermakes it easy to implement this method safely and correctly.By default, returns null to indicate that splitting is not possible.
- Overrides:
splitAtFractionin classBoundedSource.BoundedReader<T>
-