Class ExecutableStageDoFnOperator<InputT,OutputT>
java.lang.Object
org.apache.flink.streaming.api.operators.AbstractStreamOperator<WindowedValue<OutputT>>
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator<InputT,InputT,OutputT>
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator<InputT,OutputT>
- All Implemented Interfaces:
Serializable
,org.apache.flink.api.common.state.CheckpointListener
,org.apache.flink.streaming.api.operators.Input<WindowedValue<InputT>>
,org.apache.flink.streaming.api.operators.KeyContext
,org.apache.flink.streaming.api.operators.KeyContextHandler
,org.apache.flink.streaming.api.operators.OneInputStreamOperator<WindowedValue<InputT>,
,WindowedValue<OutputT>> org.apache.flink.streaming.api.operators.SetupableStreamOperator<WindowedValue<OutputT>>
,org.apache.flink.streaming.api.operators.StreamOperator<WindowedValue<OutputT>>
,org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
,org.apache.flink.streaming.api.operators.Triggerable<FlinkKey,
,org.apache.beam.runners.core.TimerInternals.TimerData> org.apache.flink.streaming.api.operators.TwoInputStreamOperator<WindowedValue<InputT>,
RawUnionValue, WindowedValue<OutputT>>
public class ExecutableStageDoFnOperator<InputT,OutputT>
extends DoFnOperator<InputT,InputT,OutputT>
This operator is the streaming equivalent of the
FlinkExecutableStageFunction
. It sends all
received elements to the SDK harness and emits the received back elements to the downstream
operators. It also takes care of handling side inputs and state.
TODO Integrate support for progress updates and metrics
- See Also:
-
Nested Class Summary
Nested classes/interfaces inherited from class org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
DoFnOperator.BufferedOutputManager<OutputT>, DoFnOperator.FlinkStepContext, DoFnOperator.MultiOutputOutputManagerFactory<OutputT>
-
Field Summary
Fields inherited from class org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
additionalOutputTags, bufferingDoFnRunner, doFn, doFnRunner, keyedStateInternals, mainOutputTag, outputManager, outputManagerFactory, pushbackDoFnRunner, serializedOptions, sideInputHandler, sideInputReader, sideInputs, sideInputTagMapping, stepName, timerInternals, timerService, windowingStrategy
Fields inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
chainingStrategy, config, latencyStats, metrics, output, processingTimeService
-
Constructor Summary
ConstructorsConstructorDescriptionExecutableStageDoFnOperator
(String stepName, Coder<WindowedValue<InputT>> windowedInputCoder, Map<TupleTag<?>, Coder<?>> outputCoders, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, Map<org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, PipelineOptions options, org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload payload, JobInfo jobInfo, FlinkExecutableStageContextFactory contextFactory, Map<String, TupleTag<?>> outputMap, WindowingStrategy windowingStrategy, Coder keyCoder, org.apache.flink.api.java.functions.KeySelector<WindowedValue<InputT>, ?> keySelector) Constructor. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
addSideInputValue
(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<RawUnionValue> streamRecord) Add the side input value.long
applyInputWatermarkHold
(long inputWatermark) Allows to apply a hold to the input watermark.long
applyOutputWatermarkHold
(long currentOutputWatermark, long potentialOutputWatermark) Allows to apply a hold to the output watermark before it is sent out.void
cleanUp()
createWrappingDoFnRunner
(org.apache.beam.runners.core.DoFnRunner<InputT, OutputT> wrappedRunner, org.apache.beam.runners.core.StepContext stepContext) protected void
fireTimerInternal
(FlinkKey key, org.apache.beam.runners.core.TimerInternals.TimerData timer) void
protected Lock
Subclasses may provide a lock to ensure that the state backend is not accessed concurrently during bundle execution.final void
notifyCheckpointComplete
(long checkpointId) void
open()
void
setCurrentKey
(Object key) We don't want to set anything here.void
setKeyContextElement1
(org.apache.flink.streaming.runtime.streamrecord.StreamRecord record) Note: This is only relevant when we have a stateful DoFn.Methods inherited from class org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
close, finish, fireTimer, getBundleFinalizer, getCurrentOutputWatermark, getDoFn, getEffectiveInputWatermark, initializeState, invokeFinishBundle, numProcessingTimeTimers, onEventTime, onProcessingTime, prepareSnapshotPreBarrier, preProcess, processElement, processElement1, processElement2, processWatermark, processWatermark1, processWatermark2, scheduleForCurrentProcessingTime, setBundleFinishedCallback, setPreBundleCallback, setup, shoudBundleElements, snapshotState
Methods inherited from class org.apache.flink.streaming.api.operators.AbstractStreamOperator
getChainingStrategy, getContainingTask, getExecutionConfig, getInternalTimerService, getKeyedStateBackend, getKeyedStateStore, getMetricGroup, getOperatorConfig, getOperatorID, getOperatorName, getOperatorStateBackend, getOrCreateKeyedState, getPartitionedState, getPartitionedState, getProcessingTimeService, getRuntimeContext, getTimeServiceManager, getUserCodeClassloader, hasKeyContext1, hasKeyContext2, initializeState, isUsingCustomRawKeyedState, notifyCheckpointAborted, processLatencyMarker, processLatencyMarker1, processLatencyMarker2, processWatermarkStatus, processWatermarkStatus1, processWatermarkStatus2, reportOrForwardLatencyMarker, setChainingStrategy, setKeyContextElement2, setProcessingTimeService, snapshotState
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.flink.api.common.state.CheckpointListener
notifyCheckpointAborted
Methods inherited from interface org.apache.flink.streaming.api.operators.Input
processLatencyMarker, processWatermarkStatus
Methods inherited from interface org.apache.flink.streaming.api.operators.KeyContextHandler
hasKeyContext
Methods inherited from interface org.apache.flink.streaming.api.operators.OneInputStreamOperator
setKeyContextElement
Methods inherited from interface org.apache.flink.streaming.api.operators.StreamOperator
getMetricGroup, getOperatorID, initializeState, setKeyContextElement2, snapshotState
Methods inherited from interface org.apache.flink.streaming.api.operators.TwoInputStreamOperator
processLatencyMarker1, processLatencyMarker2, processWatermarkStatus1, processWatermarkStatus2
-
Constructor Details
-
ExecutableStageDoFnOperator
public ExecutableStageDoFnOperator(String stepName, Coder<WindowedValue<InputT>> windowedInputCoder, Map<TupleTag<?>, Coder<?>> outputCoders, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, Map<org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputIds, PipelineOptions options, org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload payload, JobInfo jobInfo, FlinkExecutableStageContextFactory contextFactory, Map<String, TupleTag<?>> outputMap, WindowingStrategy windowingStrategy, Coder keyCoder, org.apache.flink.api.java.functions.KeySelector<WindowedValue<InputT>, ?> keySelector) Constructor.
-
-
Method Details
-
getLockToAcquireForStateAccessDuringBundles
Description copied from class:DoFnOperator
Subclasses may provide a lock to ensure that the state backend is not accessed concurrently during bundle execution.- Overrides:
getLockToAcquireForStateAccessDuringBundles
in classDoFnOperator<InputT,
InputT, OutputT>
-
open
-
notifyCheckpointComplete
- Specified by:
notifyCheckpointComplete
in interfaceorg.apache.flink.api.common.state.CheckpointListener
- Overrides:
notifyCheckpointComplete
in classDoFnOperator<InputT,
InputT, OutputT> - Throws:
Exception
-
setKeyContextElement1
public void setKeyContextElement1(org.apache.flink.streaming.runtime.streamrecord.StreamRecord record) Note: This is only relevant when we have a stateful DoFn. We want to control the key of the state backend ourselves and we must avoid any concurrent setting of the current active key. By overwriting this, we also prevent unnecessary serialization as the key has to be encoded as a byte array.- Specified by:
setKeyContextElement1
in interfaceorg.apache.flink.streaming.api.operators.StreamOperator<InputT>
- Overrides:
setKeyContextElement1
in classorg.apache.flink.streaming.api.operators.AbstractStreamOperator<WindowedValue<OutputT>>
-
setCurrentKey
We don't want to set anything here. This is due to asynchronous nature of processing elements from the SDK Harness. The Flink runtime sets the current key before callingprocessElement
, but this does not work when sending elements to the SDK harness which may be processed at an arbitrary later point in time. State for keys is also accessed asynchronously via state requests.We set the key only as it is required for 1) State requests 2) Timers (setting/firing).
- Specified by:
setCurrentKey
in interfaceorg.apache.flink.streaming.api.operators.KeyContext
- Overrides:
setCurrentKey
in classorg.apache.flink.streaming.api.operators.AbstractStreamOperator<WindowedValue<OutputT>>
-
getCurrentKey
- Specified by:
getCurrentKey
in interfaceorg.apache.flink.streaming.api.operators.KeyContext
- Overrides:
getCurrentKey
in classorg.apache.flink.streaming.api.operators.AbstractStreamOperator<WindowedValue<OutputT>>
-
fireTimerInternal
protected void fireTimerInternal(FlinkKey key, org.apache.beam.runners.core.TimerInternals.TimerData timer) - Overrides:
fireTimerInternal
in classDoFnOperator<InputT,
InputT, OutputT>
-
flushData
- Throws:
Exception
-
cleanUp
- Throws:
Exception
-
addSideInputValue
protected void addSideInputValue(org.apache.flink.streaming.runtime.streamrecord.StreamRecord<RawUnionValue> streamRecord) Description copied from class:DoFnOperator
Add the side input value. Here we are assuming that views have already been materialized and are sent over the wire asIterable
. Subclasses may elect to perform materialization in state and receive side input incrementally instead.- Overrides:
addSideInputValue
in classDoFnOperator<InputT,
InputT, OutputT> - Parameters:
streamRecord
-
-
createWrappingDoFnRunner
protected org.apache.beam.runners.core.DoFnRunner<InputT,OutputT> createWrappingDoFnRunner(org.apache.beam.runners.core.DoFnRunner<InputT, OutputT> wrappedRunner, org.apache.beam.runners.core.StepContext stepContext) - Overrides:
createWrappingDoFnRunner
in classDoFnOperator<InputT,
InputT, OutputT>
-
applyInputWatermarkHold
public long applyInputWatermarkHold(long inputWatermark) Description copied from class:DoFnOperator
Allows to apply a hold to the input watermark. By default, just passes the input watermark through.- Overrides:
applyInputWatermarkHold
in classDoFnOperator<InputT,
InputT, OutputT>
-
applyOutputWatermarkHold
public long applyOutputWatermarkHold(long currentOutputWatermark, long potentialOutputWatermark) Description copied from class:DoFnOperator
Allows to apply a hold to the output watermark before it is sent out. Used to apply hold on output watermark for delayed (asynchronous or buffered) processing.- Overrides:
applyOutputWatermarkHold
in classDoFnOperator<InputT,
InputT, OutputT> - Parameters:
currentOutputWatermark
- the current output watermarkpotentialOutputWatermark
- The potential new output watermark which can be adjusted, if needed. The input watermark hold has already been applied.- Returns:
- The new output watermark which will be emitted.
-