Class FlinkSourceReaderBase<T,OutputT>
java.lang.Object
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase<T,OutputT>
- Type Parameters:
OutputT- the output element type from the encapsulatedBeam sources.
- All Implemented Interfaces:
AutoCloseable,org.apache.flink.api.common.state.CheckpointListener,org.apache.flink.api.connector.source.SourceReader<OutputT,FlinkSourceSplit<T>>
- Direct Known Subclasses:
FlinkBoundedSourceReader,FlinkUnboundedSourceReader
public abstract class FlinkSourceReaderBase<T,OutputT>
extends Object
implements org.apache.flink.api.connector.source.SourceReader<OutputT,FlinkSourceSplit<T>>
An abstract implementation of
SourceReader which encapsulates Beam Sources
for data reading.
- Idle timeout support.
- Splits addition handling.
- Split reader creation and management.
- checkpoint management
This implementation provides unified logic for both BoundedSource and UnboundedSource. The subclasses are expected to only implement the SourceReader.pollNext(ReaderOutput) method.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected final classA wrapper for the reader and its associated information. -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected static final CompletableFuture<Void> protected final org.apache.flink.api.connector.source.SourceReaderContextprotected final longprotected final ReaderInvocationUtil<T, Source.Reader<T>> protected final org.apache.flink.metrics.Counterprotected final PipelineOptions -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedFlinkSourceReaderBase(String stepName, ScheduledExecutorService executor, org.apache.flink.api.connector.source.SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function<OutputT, Long> timestampExtractor) protectedFlinkSourceReaderBase(String stepName, org.apache.flink.api.connector.source.SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function<OutputT, Long> timestampExtractor) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddSplits(List<FlinkSourceSplit<T>> splits) protected voidaddSplitsToUnfinishedForCheckpoint(long checkpointId, List<FlinkSourceSplit<T>> splits) To be overridden in unbounded reader.protected Map<Integer, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> protected voidprotected final booleanvoidclose()protected final Optional<FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput> protected abstract Source.Reader<T> createReader(FlinkSourceSplit<T> sourceSplit) CreateSource.Readerfor givenFlinkSourceSplit.protected voidprotected final voidfinishSplit(int splitIndex) protected abstract FlinkSourceSplit<T> getReaderCheckpoint(int splitId, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput readerAndOutput) CreateFlinkSourceSplitfor givensplitId.protected booleanprotected static voidprotected abstract CompletableFuture<Void> This method needs to be overridden by subclasses to determine if data is available when there are alive readers.protected final booleanvoidprotected voidprotected voidscheduleTask(Runnable runnable, long delayMs) protected voidscheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs) snapshotState(long checkpointId) protected Collection<FlinkSourceSplit<T>> voidstart()Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAbortedMethods inherited from interface org.apache.flink.api.connector.source.SourceReader
handleSourceEvents, notifyCheckpointComplete, pauseOrResumeSplits, pollNext
-
Field Details
-
AVAILABLE_NOW
-
pipelineOptions
-
timestampExtractor
-
context
protected final org.apache.flink.api.connector.source.SourceReaderContext context -
invocationUtil
-
numRecordsInCounter
protected final org.apache.flink.metrics.Counter numRecordsInCounter -
idleTimeoutMs
protected final long idleTimeoutMs
-
-
Constructor Details
-
FlinkSourceReaderBase
-
FlinkSourceReaderBase
protected FlinkSourceReaderBase(String stepName, ScheduledExecutorService executor, org.apache.flink.api.connector.source.SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function<OutputT, Long> timestampExtractor)
-
-
Method Details
-
start
public void start() -
snapshotState
-
isAvailable
-
notifyNoMoreSplits
public void notifyNoMoreSplits() -
addSplits
-
close
- Specified by:
closein interfaceAutoCloseable- Throws:
Exception
-
isAvailableForAliveReaders
This method needs to be overridden by subclasses to determine if data is available when there are alive readers. For example, an unbounded source may not have any source split ready for data emission even if all the sources are still alive. Whereas for the bounded source, data is always available as long as there are alive readers. -
getReaderCheckpoint
protected abstract FlinkSourceSplit<T> getReaderCheckpoint(int splitId, FlinkSourceReaderBase<T, OutputT>.ReaderAndOutput readerAndOutput) throws IOExceptionCreateFlinkSourceSplitfor givensplitId.- Throws:
IOException
-
createReader
protected abstract Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit) throws IOException CreateSource.Readerfor givenFlinkSourceSplit.- Throws:
IOException
-
addSplitsToUnfinishedForCheckpoint
protected void addSplitsToUnfinishedForCheckpoint(long checkpointId, List<FlinkSourceSplit<T>> splits) To be overridden in unbounded reader. Notify the reader of created splits that will be part of checkpoint. Will be processed during notifyCheckpointComplete to finalize the associated CheckpointMarks. -
createAndTrackNextReader
protected final Optional<FlinkSourceReaderBase<T,OutputT>.ReaderAndOutput> createAndTrackNextReader() throws IOException- Throws:
IOException
-
finishSplit
- Throws:
IOException
-
checkIdleTimeoutAndMaybeStartCountdown
protected final boolean checkIdleTimeoutAndMaybeStartCountdown() -
noMoreSplits
protected final boolean noMoreSplits() -
scheduleTask
-
scheduleTaskAtFixedRate
-
execute
-
recordException
-
checkExceptionAndMaybeThrow
protected void checkExceptionAndMaybeThrow() -
hasException
protected boolean hasException() -
sourceSplits
-
allReaders
-
ignoreReturnValue
-