Class FlinkSourceReaderBase<T,OutputT>

java.lang.Object
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase<T,OutputT>
Type Parameters:
OutputT - the output element type from the encapsulated Beam sources.
All Implemented Interfaces:
AutoCloseable, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.api.connector.source.SourceReader<OutputT,FlinkSourceSplit<T>>
Direct Known Subclasses:
FlinkBoundedSourceReader, FlinkUnboundedSourceReader

public abstract class FlinkSourceReaderBase<T,OutputT> extends Object implements org.apache.flink.api.connector.source.SourceReader<OutputT,FlinkSourceSplit<T>>
An abstract implementation of SourceReader which encapsulates Beam Sources for data reading.
  1. Idle timeout support.
  2. Splits addition handling.
  3. Split reader creation and management.
  4. checkpoint management

This implementation provides unified logic for both BoundedSource and UnboundedSource. The subclasses are expected to only implement the SourceReader.pollNext(ReaderOutput) method.

  • Field Details

    • AVAILABLE_NOW

      protected static final CompletableFuture<Void> AVAILABLE_NOW
    • pipelineOptions

      protected final PipelineOptions pipelineOptions
    • timestampExtractor

      protected final @Nullable Function<OutputT,Long> timestampExtractor
    • context

      protected final org.apache.flink.api.connector.source.SourceReaderContext context
    • invocationUtil

      protected final ReaderInvocationUtil<T,Source.Reader<T>> invocationUtil
    • numRecordsInCounter

      protected final org.apache.flink.metrics.Counter numRecordsInCounter
    • idleTimeoutMs

      protected final long idleTimeoutMs
  • Constructor Details

  • Method Details

    • start

      public void start()
      Specified by:
      start in interface org.apache.flink.api.connector.source.SourceReader<T,OutputT>
    • snapshotState

      public List<FlinkSourceSplit<T>> snapshotState(long checkpointId)
      Specified by:
      snapshotState in interface org.apache.flink.api.connector.source.SourceReader<T,OutputT>
    • isAvailable

      public CompletableFuture<Void> isAvailable()
      Specified by:
      isAvailable in interface org.apache.flink.api.connector.source.SourceReader<T,OutputT>
    • notifyNoMoreSplits

      public void notifyNoMoreSplits()
      Specified by:
      notifyNoMoreSplits in interface org.apache.flink.api.connector.source.SourceReader<T,OutputT>
    • addSplits

      public void addSplits(List<FlinkSourceSplit<T>> splits)
      Specified by:
      addSplits in interface org.apache.flink.api.connector.source.SourceReader<T,OutputT>
    • close

      public void close() throws Exception
      Specified by:
      close in interface AutoCloseable
      Throws:
      Exception
    • isAvailableForAliveReaders

      protected abstract CompletableFuture<Void> isAvailableForAliveReaders()
      This method needs to be overridden by subclasses to determine if data is available when there are alive readers. For example, an unbounded source may not have any source split ready for data emission even if all the sources are still alive. Whereas for the bounded source, data is always available as long as there are alive readers.
    • getReaderCheckpoint

      protected abstract FlinkSourceSplit<T> getReaderCheckpoint(int splitId, FlinkSourceReaderBase<T,OutputT>.ReaderAndOutput readerAndOutput) throws IOException
      Create FlinkSourceSplit for given splitId.
      Throws:
      IOException
    • createReader

      protected abstract Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit) throws IOException
      Throws:
      IOException
    • addSplitsToUnfinishedForCheckpoint

      protected void addSplitsToUnfinishedForCheckpoint(long checkpointId, List<FlinkSourceSplit<T>> splits)
      To be overridden in unbounded reader. Notify the reader of created splits that will be part of checkpoint. Will be processed during notifyCheckpointComplete to finalize the associated CheckpointMarks.
    • createAndTrackNextReader

      protected final Optional<FlinkSourceReaderBase<T,OutputT>.ReaderAndOutput> createAndTrackNextReader() throws IOException
      Throws:
      IOException
    • finishSplit

      protected final void finishSplit(int splitIndex) throws IOException
      Throws:
      IOException
    • checkIdleTimeoutAndMaybeStartCountdown

      protected final boolean checkIdleTimeoutAndMaybeStartCountdown()
    • noMoreSplits

      protected final boolean noMoreSplits()
    • scheduleTask

      protected void scheduleTask(Runnable runnable, long delayMs)
    • scheduleTaskAtFixedRate

      protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs)
    • execute

      protected void execute(Runnable runnable)
    • recordException

      protected void recordException(Throwable e)
    • checkExceptionAndMaybeThrow

      protected void checkExceptionAndMaybeThrow()
    • hasException

      protected boolean hasException()
    • sourceSplits

      protected Collection<FlinkSourceSplit<T>> sourceSplits()
    • allReaders

    • ignoreReturnValue

      protected static void ignoreReturnValue(Object o)