Using the Apache Apex Runner

The Apex Runner executes Apache Beam pipelines using Apache Apex as an underlying engine. The runner has broad support for the Beam model and supports streaming and batch pipelines.

Apache Apex is a stream processing platform and framework for low-latency, high-throughput and fault-tolerant analytics applications on Apache Hadoop. Apex has a unified streaming architecture and can be used for real-time and batch processing.

The following instructions are for running Beam pipelines with Apex on a YARN cluster. They are not required for Apex in embedded mode (see quickstart).

Apex Runner prerequisites

You may set up your own Hadoop cluster. Beam does not require anything extra to launch the pipelines on YARN. An optional Apex installation may be useful for monitoring and troubleshooting. The Apex CLI can be built or obtained as binary build. For more download options see distribution information on the Apache Apex website.

Running wordcount with Apex

Typically the build environment is separate from the target YARN cluster. In such case, it is necessary to build a fat jar that will include all dependencies. Ensure that hadoop.version in pom.xml matches the version of your YARN cluster and then build the jar file:

mvn package -Papex-runner

Copy the resulting target/word-count-beam-bundled-0.1.jar to the cluster and submit the application using:

java -cp word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount --inputFile=/etc/profile --output=/tmp/counts --embeddedExecution=false --configFile=beam-runners-apex.properties --runner=ApexRunner

If the build environment is setup as cluster client, it is possible to run the example directly:

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/etc/profile --output=/tmp/counts --runner=ApexRunner --embeddedExecution=false --configFile=beam-runners-apex.properties" -Papex-runner

The application will run asynchronously. Check status with yarn application -list -appStates ALL

The configuration file is optional, it can be used to influence how Apex operators are deployed into YARN containers. The following example will reduce the number of required containers by collocating the operators into the same container and lower the heap memory per operator - suitable for execution in a single node Hadoop sandbox.

apex.application.*.operator.*.attr.MEMORY_MB=64
apex.stream.*.prop.locality=CONTAINER_LOCAL
apex.application.*.operator.*.attr.TIMEOUT_WINDOW_COUNT=1200

This example uses local files. To use a distributed file system (HDFS, S3 etc.), it is necessary to augment the build to include the respective file system provider.

Montoring progress of your job

Depending on your installation, you may be able to monitor the progress of your job on the Hadoop cluster. Alternatively, you have following options:

Check the output of the pipeline:

ls /tmp/counts*