public abstract static class FileBasedSource.FileBasedReader<T> extends OffsetBasedSource.OffsetBasedReader<T>
reader
that implements code common to readers of FileBasedSource
s.
This reader uses a ReadableByteChannel
created for the file represented by the
corresponding source to efficiently move to the correct starting position defined in the
source. Subclasses of this reader should implement startReading(java.nio.channels.ReadableByteChannel)
to get access to this
channel. If the source corresponding to the reader is for a subrange of a file the ReadableByteChannel
provided is guaranteed to be an instance of the type SeekableByteChannel
, which may be used by subclass to traverse back in the channel to
determine the correct starting position.
Sequential reading is implemented using readNextRecord()
.
Then FileBasedReader
implements "reading a range [A, B)" in the following way.
OffsetBasedSource.OffsetBasedReader.start()
opens the file
OffsetBasedSource.OffsetBasedReader.start()
seeks the SeekableByteChannel
to A (reading offset ranges for
non-seekable files is not supported) and calls startReading()
OffsetBasedSource.OffsetBasedReader.start()
calls OffsetBasedSource.OffsetBasedReader.advance()
once, which, via readNextRecord()
, locates
the first record which is at a split point AND its offset is at or after A. If this
record is at or after B, OffsetBasedSource.OffsetBasedReader.advance()
returns false and reading is finished.
true
sequential reading starts and advance()
will be called repeatedly
advance()
calls readNextRecord()
on the subclass, and stops (returns false) if
the new record is at a split point AND the offset of the new record is at or after B.
Since this class implements Source.Reader
it guarantees thread safety. Abstract
methods defined here will not be accessed by more than one thread concurrently.
SPLIT_POINTS_UNKNOWN
Constructor and Description |
---|
FileBasedReader(FileBasedSource<T> source)
Subclasses should not perform IO operations at the constructor.
|
Modifier and Type | Method and Description |
---|---|
protected boolean |
advanceImpl()
Advances to the next record and returns
true , or returns false if there is no next
record. |
boolean |
allowsDynamicSplitting()
Whether this reader should allow dynamic splitting of the offset ranges.
|
void |
close()
Closes any
ReadableByteChannel created for the current reader. |
FileBasedSource<T> |
getCurrentSource()
Returns a
Source describing the same input that this Reader currently reads
(including items already read). |
protected abstract boolean |
readNextRecord()
Reads the next record from the channel provided by
startReading(java.nio.channels.ReadableByteChannel) . |
protected boolean |
startImpl()
Initializes the
OffsetBasedSource.OffsetBasedReader and advances to the first record,
returning true if there is a record available to be read. |
protected abstract void |
startReading(java.nio.channels.ReadableByteChannel channel)
Performs any initialization of the subclass of
FileBasedReader that involves IO
operations. |
advance, getCurrentOffset, getFractionConsumed, getSplitPointsConsumed, getSplitPointsRemaining, isAtSplitPoint, isDone, isStarted, splitAtFraction, start
getCurrentTimestamp
getCurrent
public FileBasedReader(FileBasedSource<T> source)
startReading(java.nio.channels.ReadableByteChannel)
method is invoked.public FileBasedSource<T> getCurrentSource()
BoundedSource.BoundedReader
Source
describing the same input that this Reader
currently reads
(including items already read).
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after
a successful BoundedSource.BoundedReader.splitAtFraction(double)
call.
Remember that Source
objects must always be immutable. However, the return value
of this function may be affected by dynamic work rebalancing, happening asynchronously via
BoundedSource.BoundedReader.splitAtFraction(double)
, meaning it can return a different Source
object. However, the returned object itself will still itself be immutable. Callers
must take care not to rely on properties of the returned source that may be asynchronously
changed as a result of this process (e.g. do not cache an end offset when reading a file).
For convenience, subclasses should usually return the most concrete subclass of Source
possible. In practice, the implementation of this method should nearly always be one
of the following:
BoundedSource.BoundedReader.getCurrentSource()
: delegate to base class. In this case, it is almost always an error
for the subclass to maintain its own copy of the source.
public FooReader(FooSource<T> source) {
super(source);
}
public FooSource<T> getCurrentSource() {
return (FooSource<T>)super.getCurrentSource();
}
private final FooSource<T> source;
public FooReader(FooSource<T> source) {
this.source = source;
}
public FooSource<T> getCurrentSource() {
return source;
}
BoundedSource.BoundedReader
that explicitly supports dynamic work rebalancing:
maintain a variable pointing to an immutable source object, and protect it with
synchronization.
private FooSource<T> source;
public FooReader(FooSource<T> source) {
this.source = source;
}
public synchronized FooSource<T> getCurrentSource() {
return source;
}
public synchronized FooSource<T> splitAtFraction(double fraction) {
...
FooSource<T> primary = ...;
FooSource<T> residual = ...;
this.source = primary;
return residual;
}
getCurrentSource
in class OffsetBasedSource.OffsetBasedReader<T>
protected final boolean startImpl() throws java.io.IOException
OffsetBasedSource.OffsetBasedReader
OffsetBasedSource.OffsetBasedReader
and advances to the first record,
returning true
if there is a record available to be read. This method will be invoked
exactly once and may perform expensive setup operations that are needed to initialize the
reader.
This function is the OffsetBasedReader
implementation of BoundedReader#start
. The key difference is that the implementor can ignore the possibility
that it should no longer produce the first record, either because it has exceeded the
original endOffset
assigned to the reader, or because a concurrent call to OffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
has changed the source to shrink the offset range being read.
startImpl
in class OffsetBasedSource.OffsetBasedReader<T>
java.io.IOException
BoundedReader#start
protected final boolean advanceImpl() throws java.io.IOException
OffsetBasedSource.OffsetBasedReader
true
, or returns false if there is no next
record.
This function is the OffsetBasedReader
implementation of BoundedReader#advance
. The key difference is that the implementor can ignore the possibility
that it should no longer produce the next record, either because it has exceeded the original
endOffset
assigned to the reader, or because a concurrent call to OffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
has changed the source to shrink the offset range being read.
advanceImpl
in class OffsetBasedSource.OffsetBasedReader<T>
java.io.IOException
BoundedReader#advance
public void close() throws java.io.IOException
ReadableByteChannel
created for the current reader. This implementation is
idempotent. Any close()
method introduced by a subclass must be idempotent and must
call the close()
method in the FileBasedReader
.close
in interface java.lang.AutoCloseable
close
in class Source.Reader<T>
java.io.IOException
public boolean allowsDynamicSplitting()
OffsetBasedSource.OffsetBasedReader
True by default. Override this to return false if the reader cannot support dynamic
splitting correctly. If this returns false, OffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
will
refuse all split requests.
allowsDynamicSplitting
in class OffsetBasedSource.OffsetBasedReader<T>
protected abstract void startReading(java.nio.channels.ReadableByteChannel channel) throws java.io.IOException
FileBasedReader
that involves IO
operations. Will only be invoked once and before that invocation the base class will seek the
channel to the source's starting offset.
Provided ReadableByteChannel
is for the file represented by the source of this
reader. Subclass may use the channel
to build a higher level IO abstraction, e.g., a
BufferedReader or an XML parser.
If the corresponding source is for a subrange of a file, channel
is guaranteed to
be an instance of the type SeekableByteChannel
.
After this method is invoked the base class will not be reading data from the channel or adjusting the position of the channel. But the base class is responsible for properly closing the channel.
channel
- a byte channel representing the file backing the reader.java.io.IOException
protected abstract boolean readNextRecord() throws java.io.IOException
startReading(java.nio.channels.ReadableByteChannel)
. Methods Source.Reader.getCurrent()
, OffsetBasedSource.OffsetBasedReader.getCurrentOffset()
, and OffsetBasedSource.OffsetBasedReader.isAtSplitPoint()
should return the
corresponding information about the record read by the last invocation of this method.
Note that this method will be called the same way for reading the first record in the source (file or offset range in the file) and for reading subsequent records. It is up to the subclass to do anything special for locating and reading the first record, if necessary.
true
if a record was successfully read, false
if the end of the
channel was reached before successfully reading a new record.java.io.IOException