Overview

The Apache Flink Runner can be used to execute Beam pipelines using Apache Flink. For execution you can choose between a cluster execution mode (e.g. Yarn/Kubernetes/Mesos) or a local embedded execution mode which is useful for testing pipelines.

The Flink Runner and Flink are suitable for large scale, continuous jobs, and provide:

Using the Apache Flink Runner

It is important to understand that the Flink Runner comes in two flavors:

  1. The original classic Runner which supports only Java (and other JVM-based languages)
  2. The newer portable Runner which supports Java/Python/Go

You may ask why there are two Runners?

Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages.

If your applications only use Java, then you should currently go with the classic Runner. Eventually, the portable Runner will replace the classic Runner because it contains the generalized framework for executing Java, Python, Go, and more languages in the future.

If you want to run Python pipelines with Beam on Flink you want to use the portable Runner. For more information on portability, please visit the Portability page.

Consequently, this guide is split into parts to document the classic and the portable functionality of the Flink Runner. In addition, Python provides convenience wrappers to handle the full lifecycle of the runner, and so is further split depending on whether to manage the portability components automatically (recommended) or manually. Please use the switcher below to select the appropriate mode for the Runner:

Prerequisites and Setup

If you want to use the local execution mode with the Flink Runner you don’t have to complete any cluster setup. You can simply run your Beam pipeline. Be sure to set the Runner to FlinkRunnerPortableRunner.

To use the Flink Runner for executing on a cluster, you have to setup a Flink cluster by following the Flink Setup Quickstart.

Dependencies

You must specify your dependency on the Flink Runner in your pom.xml or build.gradle. Use the Beam version and the artifact id from the compatibility table below. For example:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-flink-1.18</artifactId>
  <version>2.60.0</version>
</dependency>

You will need Docker to be installed in your execution environment. To run an embedded flink cluster or use the Flink runner for Python < 3.6 you will also need to have java available in your execution environment.

You will need Docker to be installed in your execution environment.

For executing a pipeline on a Flink cluster you need to package your program along with all dependencies in a so-called fat jar. How you do this depends on your build system but if you follow along the Beam Quickstart this is the command that you have to run:

$ mvn package -Pflink-runner

Look for the output JAR of this command in the target folder.

The Beam Quickstart Maven project is setup to use the Maven Shade plugin to create a fat jar and the -Pflink-runner argument makes sure to include the dependency on the Flink Runner.

For running the pipeline the easiest option is to use the flink command which is part of Flink:

$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters

Alternatively you can also use Maven’s exec command. For example, to execute the WordCount example:

mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"

If you have a Flink JobManager running on your local machine you can provide localhost:8081 for flinkMaster. Otherwise an embedded Flink cluster will be started for the job.

To run a pipeline on Flink, set the runner to FlinkRunner and flink_master to the master URL of a Flink cluster. In addition, optionally set environment_type set to LOOPBACK. For example, after starting up a local flink cluster, one could run:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_master=localhost:8081",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

To run on an embedded Flink cluster, simply omit the flink_master option and an embedded Flink cluster will be automatically started and shut down for the job.

The optional flink_version option may be required as well for older versions of Python.

Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub: Flink 1.16. Flink 1.17. Flink 1.18.

To run a pipeline on an embedded Flink cluster:

(1) Start the JobService endpoint: docker run --net=host apache/beam_flink1.18_job_server:latest

The JobService is the central instance where you submit your Beam pipeline to. It creates a Flink job from your pipeline and executes it. You might encounter an error message like Caused by: java.io.IOException: Insufficient number of network buffers:.... This can be resolved by providing a Flink configuration file to override the default settings. You can find an example configuration file here. To start the Job Service endpoint with your custom configuration, mount a local directory containing your Flink configuration to the /flink-conf path in the Docker container and pass this as --flink-conf-dir: docker run --net=host -v <your_flink_conf_dir>:/flink-conf beam-flink-runner apache/beam_flink1.18_job_server:latest --flink-conf-dir /flink-conf

(2) Submit the Python pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService). Optionally set environment_type set to LOOPBACK. For example:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

To run on a separate Flink cluster:

(1) Start a Flink cluster which exposes the Rest interface (e.g. localhost:8081 by default).

(2) Start JobService with Flink Rest endpoint: docker run --net=host apache/beam_flink1.18_job_server:latest --flink-master=localhost:8081.

(3) Submit the pipeline as above.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as p:
    ...

Note that environment_type=LOOPBACK is only intended for local testing, and will not work on remote clusters. See here for details.

Additional information and caveats

Monitoring your job

You can monitor a running Flink job using the Flink JobManager Dashboard or its Rest interfaces. By default, this is available at port 8081 of the JobManager node. If you have a Flink installation on your local machine that would be http://localhost:8081. Note: When you use the [local] mode an embedded Flink cluster will be started which does not make a dashboard available.

Streaming Execution

If your pipeline uses an unbounded data source or sink, the Flink Runner will automatically switch to streaming mode. You can enforce streaming mode by using the --streaming flag.

Note: The Runner will print a warning message when unbounded sources are used and checkpointing is not enabled. Many sources like PubSubIO rely on their checkpoints to be acknowledged which can only be done when checkpointing is enabled for the FlinkRunner. To enable checkpointing, please set checkpointingIntervalcheckpointing_interval to the desired checkpointing interval in milliseconds.

When executing your pipeline with the Flink Runner, you can set these pipeline options.

The following list of Flink-specific pipeline options is generated automatically from the FlinkPipelineOptions reference class:

allowNonRestoredStateFlag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.Default: false
attachedModeSpecifies if the pipeline is submitted in attached or detached modeDefault: true
autoBalanceWriteFilesShardingEnabledFlag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.Default: false
autoWatermarkIntervalThe interval in milliseconds for automatic watermark emission.
checkpointTimeoutMillisThe maximum time in milliseconds that a checkpoint may take before being discarded.Default: -1
checkpointingIntervalThe interval in milliseconds at which to trigger checkpoints of the running pipeline. Default: No checkpointing.Default: -1
checkpointingModeThe checkpointing mode that defines consistency guarantee.Default: EXACTLY_ONCE
disableMetricsDisable Beam metrics in Flink RunnerDefault: false
enableStableInputDrainAllow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.Default: false
executionModeForBatchFlink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672Default: PIPELINED
executionRetryDelaySets the delay in milliseconds between executions. A value of {@code -1} indicates that the default value should be used.Default: -1
externalizedCheckpointsEnabledEnables or disables externalized checkpoints. Works in conjunction with CheckpointingIntervalDefault: false
failOnCheckpointingErrorsSets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline the checkpoint and continue running.Default: true
fasterCopyRemove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146Default: false
fileInputSplitMaxSizeMBSet the maximum size of input split when data is read from a filesystem. 0 implies no max size.Default: 0
finishBundleBeforeCheckpointingIf set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.Default: false
flinkConfDirDirectory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.
flinkMasterAddress of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].Default: [auto]
forceUnalignedCheckpointEnabledForces unaligned checkpoints, particularly allowing them for iterative jobs.Default: false
jobCheckIntervalInSecsSet job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 secondsDefault: 5
latencyTrackingIntervalInterval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.Default: 0
maxBundleSizeThe maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batchDefault: MaxBundleSizeFactory
maxBundleTimeMillsThe maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.Default: MaxBundleTimeFactory
maxParallelismThe pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.Default: -1
minPauseBetweenCheckpointsThe minimal pause in milliseconds before the next checkpoint is triggered.Default: -1
numConcurrentCheckpointsThe maximum number of concurrent checkpoints. Defaults to 1 (=no concurrent checkpoints).Default: 1
numberOfExecutionRetriesSets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used.Default: -1
objectReuseSets the behavior of reusing objects.Default: false
operatorChainingSets the behavior of operator chaining.Default: true
parallelismThe degree of parallelism to be used when distributing operations onto workers. If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.Default: -1
reIterableGroupByKeyResultFlag indicating whether result of GBK needs to be re-iterable. Re-iterable result implies that all values for a single key must fit in memory as we currently do not support spilling to disk.Default: false
reportCheckpointDurationIf not null, reports the checkpoint duration of each ParDo stage in the provided metric namespace.
retainExternalizedCheckpointsOnCancellationSets the behavior of externalized checkpoints on cancellation.Default: false
savepointPathSavepoint restore path. If specified, restores the streaming pipeline from the provided path.
shutdownSourcesAfterIdleMsShuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.Default: -1
stateBackendState backend to store Beam's state. Use 'rocksdb' or 'filesystem'.
stateBackendFactorySets the state backend factory to use in streaming mode. Defaults to the flink cluster's state.backend configuration.
stateBackendStoragePathState backend path to persist state backend data. Used to initialize state backend.
unalignedCheckpointEnabledIf set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymoreDefault: false
allow_non_restored_stateFlag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.Default: false
attached_modeSpecifies if the pipeline is submitted in attached or detached modeDefault: true
auto_balance_write_files_sharding_enabledFlag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.Default: false
auto_watermark_intervalThe interval in milliseconds for automatic watermark emission.
checkpoint_timeout_millisThe maximum time in milliseconds that a checkpoint may take before being discarded.Default: -1
checkpointing_intervalThe interval in milliseconds at which to trigger checkpoints of the running pipeline. Default: No checkpointing.Default: -1
checkpointing_modeThe checkpointing mode that defines consistency guarantee.Default: EXACTLY_ONCE
disable_metricsDisable Beam metrics in Flink RunnerDefault: false
enable_stable_input_drainAllow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.Default: false
execution_mode_for_batchFlink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672Default: PIPELINED
execution_retry_delaySets the delay in milliseconds between executions. A value of {@code -1} indicates that the default value should be used.Default: -1
externalized_checkpoints_enabledEnables or disables externalized checkpoints. Works in conjunction with CheckpointingIntervalDefault: false
fail_on_checkpointing_errorsSets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline the checkpoint and continue running.Default: true
faster_copyRemove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146Default: false
file_input_split_max_size_m_bSet the maximum size of input split when data is read from a filesystem. 0 implies no max size.Default: 0
finish_bundle_before_checkpointingIf set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.Default: false
flink_conf_dirDirectory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.
flink_masterAddress of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].Default: [auto]
force_unaligned_checkpoint_enabledForces unaligned checkpoints, particularly allowing them for iterative jobs.Default: false
job_check_interval_in_secsSet job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 secondsDefault: 5
latency_tracking_intervalInterval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.Default: 0
max_bundle_sizeThe maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batchDefault: MaxBundleSizeFactory
max_bundle_time_millsThe maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.Default: MaxBundleTimeFactory
max_parallelismThe pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.Default: -1
min_pause_between_checkpointsThe minimal pause in milliseconds before the next checkpoint is triggered.Default: -1
num_concurrent_checkpointsThe maximum number of concurrent checkpoints. Defaults to 1 (=no concurrent checkpoints).Default: 1
number_of_execution_retriesSets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used.Default: -1
object_reuseSets the behavior of reusing objects.Default: false
operator_chainingSets the behavior of operator chaining.Default: true
parallelismThe degree of parallelism to be used when distributing operations onto workers. If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.Default: -1
re_iterable_group_by_key_resultFlag indicating whether result of GBK needs to be re-iterable. Re-iterable result implies that all values for a single key must fit in memory as we currently do not support spilling to disk.Default: false
report_checkpoint_durationIf not null, reports the checkpoint duration of each ParDo stage in the provided metric namespace.
retain_externalized_checkpoints_on_cancellationSets the behavior of externalized checkpoints on cancellation.Default: false
savepoint_pathSavepoint restore path. If specified, restores the streaming pipeline from the provided path.
shutdown_sources_after_idle_msShuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.Default: -1
state_backendState backend to store Beam's state. Use 'rocksdb' or 'filesystem'.
state_backend_factorySets the state backend factory to use in streaming mode. Defaults to the flink cluster's state.backend configuration.
state_backend_storage_pathState backend path to persist state backend data. Used to initialize state backend.
unaligned_checkpoint_enabledIf set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymoreDefault: false

For general Beam pipeline options see the PipelineOptions reference.

The Flink cluster version has to match the minor version used by the FlinkRunner. The minor version is the first two numbers in the version string, e.g. in 1.18.0 the minor version is 1.18.

We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. The Flink community supports the last two minor versions. When support for a Flink version is dropped, it may be deprecated and removed also from Beam. To find out which version of Flink is compatible with Beam please see the table below:

Flink VersionArtifact IdSupported Beam Versions
1.19.xbeam-runners-flink-1.19≥ 2.61.0
1.18.xbeam-runners-flink-1.18≥ 2.57.0
1.17.xbeam-runners-flink-1.17≥ 2.56.0
1.16.xbeam-runners-flink-1.162.47.0 - 2.60.0
1.15.xbeam-runners-flink-1.152.40.0 - 2.60.0
1.14.xbeam-runners-flink-1.142.38.0 - 2.56.0
1.13.xbeam-runners-flink-1.132.31.0 - 2.55.0
1.12.xbeam-runners-flink-1.122.27.0 - 2.55.0
1.11.xbeam-runners-flink-1.112.25.0 - 2.38.0
1.10.xbeam-runners-flink-1.102.21.0 - 2.30.0
1.9.xbeam-runners-flink-1.92.17.0 - 2.29.0
1.8.xbeam-runners-flink-1.82.13.0 - 2.29.0
1.7.xbeam-runners-flink-1.72.10.0 - 2.20.0
1.6.xbeam-runners-flink-1.62.10.0 - 2.16.0
1.5.xbeam-runners-flink_2.112.6.0 - 2.16.0
1.4.x with Scala 2.11beam-runners-flink_2.112.3.0 - 2.5.0
1.3.x with Scala 2.10beam-runners-flink_2.102.1.x - 2.2.0
1.2.x with Scala 2.10beam-runners-flink_2.102.0.0

For retrieving the right Flink version, see the Flink downloads page.

For more information, the Flink Documentation can be helpful.

Beam Capability

The Beam Capability Matrix documents the capabilities of the classic Flink Runner.

The Portable Capability Matrix documents the capabilities of the portable Flink Runner.