Using the Apache Flink Runner

The old Flink Runner will eventually be replaced by the Portable Runner which enables to run pipelines in other languages than Java. Please see the Portability page for the latest state.

The Apache Flink Runner can be used to execute Beam pipelines using Apache Flink. When using the Flink Runner you will create a jar file containing your job that can be executed on a regular Flink cluster. It’s also possible to execute a Beam pipeline using Flink’s local execution mode without setting up a cluster. This is helpful for development and debugging of your pipeline.

The Flink Runner and Flink are suitable for large scale, continuous jobs, and provide:

The Beam Capability Matrix documents the supported capabilities of the Flink Runner.

If you want to use the local execution mode with the Flink Runner you don’t have to complete any setup. You can simply run your Beam pipeline. Be sure to set the Runner to FlinkRunner.

To use the Flink Runner for executing on a cluster, you have to setup a Flink cluster by following the Flink Setup Quickstart.

Version Compatibility

The Flink cluster version has to match the minor version used by the FlinkRunner. The minor version is the first two numbers in the version string, e.g. in 1.7.0 the minor version is 1.7.

We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. The Flink community typially supports the last two minor versions. When support for a Flink version is dropped, it may be deprecated and removed also from Beam, with the exception of Beam LTS releases. LTS releases continue to receive bug fixes for long as the LTS support period.

To find out which version of Flink is compatible with Beam please see the table below:

Beam Version Flink Version Artifact Id
2.10.0 1.5.x beam-runners-flink_2.11
1.6.x beam-runners-flink-1.6
1.7.x beam-runners-flink-1.7
2.9.0 1.5.x beam-runners-flink_2.11
2.8.0
2.7.0
2.6.0
2.5.0 1.4.x with Scala 2.11 beam-runners-flink_2.11
2.4.0
2.3.0
2.2.0 1.3.x with Scala 2.10 beam-runners-flink_2.10
2.1.x
2.0.0 1.2.x with Scala 2.10 beam-runners-flink_2.10

For retrieving the right Flink version, see the Flink downloads page.

For more information, the Flink Documentation can be helpful.

Specify your dependency

When using Java, you must specify your dependency on the Flink Runner in your pom.xml.

Use the Beam version and the artifact id from the above table. For example:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-flink-1.6</artifactId>
  <version>2.10.0</version>
</dependency>

This section is not applicable to the Beam SDK for Python.

For executing a pipeline on a Flink cluster you need to package your program along will all dependencies in a so-called fat jar. How you do this depends on your build system but if you follow along the Beam Quickstart this is the command that you have to run:

$ mvn package -Pflink-runner

The Beam Quickstart Maven project is setup to use the Maven Shade plugin to create a fat jar and the -Pflink-runner argument makes sure to include the dependency on the Flink Runner.

For actually running the pipeline you would use this command

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"

If you have a Flink JobManager running on your local machine you can provide localhost:8081 for flinkMaster. Otherwise an embedded Flink cluster will be started for the WordCount job.

Additional information and caveats

Monitoring your job

You can monitor a running Flink job using the Flink JobManager Dashboard or its Rest interfaces. By default, this is available at port 8081 of the JobManager node. If you have a Flink installation on your local machine that would be http://localhost:8081. Note: When you use the [local] mode an embedded Flink cluster will be started which does not make a dashboard available.

Streaming Execution

If your pipeline uses an unbounded data source or sink, the Flink Runner will automatically switch to streaming mode. You can enforce streaming mode by using the streaming setting mentioned below.

Note: The Runner will print a warning message when unbounded sources are used and checkpointing is not enabled. Many sources like PubSubIO rely on their checkpoints to be acknowledged which can only be done when checkpointing is enabled for the FlinkRunner. To enable checkpointing, please set checkpointingIntervalcheckpointing_interval to the desired checkpointing interval in milliseconds.

When executing your pipeline with the Flink Runner, you can set these pipeline options.

See the reference documentation for the FlinkPipelineOptions PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.

Field Description Default Value
runner The pipeline runner to use. This option allows you to determine the pipeline runner at runtime. Set to FlinkRunner to run using Flink.
streaming Whether streaming mode is enabled or disabled; true if enabled. Set to true if running pipelines with unbounded PCollections. false
flinkMaster The url of the Flink JobManager on which to execute pipelines. This can either be the address of a cluster JobManager, in the form "host:port" or one of the special Strings "[local]" or "[auto]". "[local]" will start a local Flink Cluster in the JVM while "[auto]" will let the system decide where to execute the pipeline based on the environment. [auto]
filesToStage Jar Files to send to all workers and put on the classpath. Here you have to put the fat jar that contains your program along with all dependencies. empty
parallelism The degree of parallelism to be used when distributing operations onto workers. For local execution: Number of available CPU cores For remote execution: Default parallelism configuerd at remote cluster Otherwise: 1
maxParallelism The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state. -1L, meaning same as the parallelism
checkpointingInterval The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for fault tolerance). -1L, i.e. disabled
checkpointMode The checkpointing mode that defines consistency guarantee. EXACTLY_ONCE
checkpointTimeoutMillis The maximum time in milliseconds that a checkpoint may take before being discarded -1, the cluster default
minPauseBetweenCheckpoints The minimal pause in milliseconds before the next checkpoint is triggered. -1, the cluster default
failOnCheckpointingErrors Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline a the checkpoint and continue running. -1, the cluster default
numberOfExecutionRetries Sets the number of times that failed tasks are re-executed. A value of 0 effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used. -1
executionRetryDelay Sets the delay between executions. A value of -1 indicates that the default value should be used. -1
objectReuse Sets the behavior of reusing objects. false, no Object reuse
stateBackend Sets the state backend to use in streaming mode. The default is to read this setting from the Flink config. empty, i.e. read from Flink config
enableMetrics Enable/disable Beam metrics in Flink Runner Default: true
externalizedCheckpointsEnabled Enables or disables externalized checkpoints. Works in conjunction with CheckpointingInterval Default: false
retainExternalizedCheckpointsOnCancellation Sets the behavior of externalized checkpoints on cancellation. Default: false
maxBundleSize The maximum number of elements in a bundle. Default: 1000
maxBundleTimeMills The maximum time to wait before finalising a bundle (in milliseconds). Default: 1000
shutdownSourcesOnFinalWatermark If set, shutdown sources when their watermark reaches +Inf. Default: false
latencyTrackingInterval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. Default: 0
autoWatermarkInterval The interval in milliseconds for automatic watermark emission.
executionModeForBatch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672 Default: PIPELINED
savepointPath Savepoint restore path. If specified, restores the streaming pipeline from the provided path. Default: None
allowNonRestoredState Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false
files_to_stage Jar-Files to send to all workers and put on the classpath. The default value is all files from the classpath.
flink_master Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto]
parallelism The degree of parallelism to be used when distributing operations onto workers. If the parallelism is not set, the configured Flink default is used, or 1 if none can be found. Default: -1
max_parallelism The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state. Default: -1
checkpointing_interval The interval in milliseconds at which to trigger checkpoints of the running pipeline. Default: No checkpointing. Default: -1
checkpointing_mode The checkpointing mode that defines consistency guarantee. Default: EXACTLY_ONCE
checkpoint_timeout_millis The maximum time in milliseconds that a checkpoint may take before being discarded. Default: -1
min_pause_between_checkpoints The minimal pause in milliseconds before the next checkpoint is triggered. Default: -1
fail_on_checkpointing_errors Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline a the checkpoint and continue running. Default: true
number_of_execution_retries Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used. Default: -1
execution_retry_delay Sets the delay in milliseconds between executions. A value of {@code -1} indicates that the default value should be used. Default: -1
object_reuse Sets the behavior of reusing objects. Default: false
state_backend Sets the state backend to use in streaming mode. Otherwise the default is read from the Flink config.
enable_metrics Enable/disable Beam metrics in Flink Runner Default: true
externalized_checkpoints_enabled Enables or disables externalized checkpoints. Works in conjunction with CheckpointingInterval Default: false
retain_externalized_checkpoints_on_cancellation Sets the behavior of externalized checkpoints on cancellation. Default: false
max_bundle_size The maximum number of elements in a bundle. Default: 1000
max_bundle_time_mills The maximum time to wait before finalising a bundle (in milliseconds). Default: 1000
shutdown_sources_on_final_watermark If set, shutdown sources when their watermark reaches +Inf. Default: false
latency_tracking_interval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. Default: 0
auto_watermark_interval The interval in milliseconds for automatic watermark emission.
execution_mode_for_batch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672 Default: PIPELINED
savepoint_path Savepoint restore path. If specified, restores the streaming pipeline from the provided path.
allow_non_restored_state Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false