@DoFn.UnboundedPerElement public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata,DataChangeRecord> implements java.io.Serializable
The processing of a partition is delegated to the QueryChangeStreamAction
.
DoFn.AlwaysFetched, DoFn.BoundedPerElement, DoFn.BundleFinalizer, DoFn.Element, DoFn.FieldAccess, DoFn.FinishBundle, DoFn.FinishBundleContext, DoFn.GetInitialRestriction, DoFn.GetInitialWatermarkEstimatorState, DoFn.GetRestrictionCoder, DoFn.GetSize, DoFn.GetWatermarkEstimatorStateCoder, DoFn.Key, DoFn.MultiOutputReceiver, DoFn.NewTracker, DoFn.NewWatermarkEstimator, DoFn.OnTimer, DoFn.OnTimerContext, DoFn.OnTimerFamily, DoFn.OnWindowExpiration, DoFn.OnWindowExpirationContext, DoFn.OutputReceiver<T>, DoFn.ProcessContext, DoFn.ProcessContinuation, DoFn.ProcessElement, DoFn.RequiresStableInput, DoFn.RequiresTimeSortedInput, DoFn.Restriction, DoFn.Setup, DoFn.SideInput, DoFn.SplitRestriction, DoFn.StartBundle, DoFn.StartBundleContext, DoFn.StateId, DoFn.Teardown, DoFn.TimerFamily, DoFn.TimerId, DoFn.Timestamp, DoFn.TruncateRestriction, DoFn.UnboundedPerElement, DoFn.WatermarkEstimatorState, DoFn.WindowedContext
Constructor and Description |
---|
ReadChangeStreamPartitionDoFn(DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
ChangeStreamMetrics metrics)
This class needs a
DaoFactory to build DAOs to access the partition metadata tables and
to perform the change streams query. |
getAllowedTimestampSkew, getInputTypeDescriptor, getOutputTypeDescriptor, populateDisplayData, prepareForProcessing
public ReadChangeStreamPartitionDoFn(DaoFactory daoFactory, MapperFactory mapperFactory, ActionFactory actionFactory, ChangeStreamMetrics metrics)
DaoFactory
to build DAOs to access the partition metadata tables and
to perform the change streams query. It uses mappers to transform database rows into the ChangeStreamRecord
model. It uses the
ActionFactory
to construct the action dispatchers, which will perform the change stream
query and process each type of record received. It emits metrics for the partition using the
ChangeStreamMetrics
.daoFactory
- the DaoFactory
to construct PartitionMetadataDao
s and ChangeStreamDao
smapperFactory
- the MapperFactory
to construct ChangeStreamRecordMapper
sactionFactory
- the ActionFactory
to construct actionsmetrics
- the ChangeStreamMetrics
to emit partition related metrics@DoFn.GetInitialWatermarkEstimatorState public Instant getInitialWatermarkEstimatorState(@DoFn.Element PartitionMetadata partition)
@DoFn.NewWatermarkEstimator public ManualWatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant watermarkEstimatorState)
@DoFn.GetInitialRestriction public OffsetRange initialRestriction(@DoFn.Element PartitionMetadata partition)
OffsetRange
restriction represents a closed-open interval, while the start / end timestamps represent a
closed-closed interval, so we add 1 microsecond to the end timestamp to convert it to
closed-open.
In this function we also update the partition state to PartitionMetadata.State#RUNNING
.
partition
- the partition to be queried@DoFn.NewTracker public ReadChangeStreamPartitionRangeTracker newTracker(@DoFn.Element PartitionMetadata partition, @DoFn.Restriction OffsetRange offsetRange)
@DoFn.Setup public void setup()
PartitionMetadataDao
, ChangeStreamDao
, ChangeStreamRecordMapper
, PartitionMetadataMapper
, DataChangeRecordAction
,
HeartbeatRecordAction
, ChildPartitionsRecordAction
and QueryChangeStreamAction
.@DoFn.ProcessElement public DoFn.ProcessContinuation processElement(@DoFn.Element PartitionMetadata partition, RestrictionTracker<OffsetRange,java.lang.Long> tracker, DoFn.OutputReceiver<DataChangeRecord> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, DoFn.BundleFinalizer bundleFinalizer)
The processing of a partition is delegated to the QueryChangeStreamAction
.
partition
- the partition to be queriedtracker
- an instance of ReadChangeStreamPartitionRangeTracker
receiver
- a DataChangeRecord
OutputReceiver
watermarkEstimator
- a ManualWatermarkEstimator
of Instant
bundleFinalizer
- the bundle finalizerProcessContinuation#stop()
if a record timestamp could not be claimed or if
the partition processing has finished