The Hazelcast Jet Runner can be used to execute Beam pipelines using Hazelcat Jet.
The Jet Runner and Jet are suitable for large scale continuous jobs and provide:
- Support for both batch (bounded) and streaming (unbounded) data sets
- A runtime that supports very high throughput and low event latency at the same time
- Natural back-pressure in streaming programs
- Distributed massively parallel data processing engine with in memory storage
It’s important to note that the Jet Runner is currently in an EXPERIMENTAL state and can not make use of many of the capabilities present in Jet:
- Jet has full Fault Tolerance support, the Jet Runner does not; if a job fails it must be restarted
- Internal performance of Jet is extremely high. The Runner can’t match it as of now because Beam pipeline optimization/surgery has not been fully implemented.
The Beam Capability Matrix documents the supported capabilities of the Jet Runner.
Running WordCount with the Hazelcast Jet Runner
Generating the Beam examples project from SNAPSHOT versions of Beam
Make sure that your maven config (~/.m2/settings.xml) is set up to have access to the Apache Snapshot Repository. It should contain this:
<repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
Generate the Examples Maven Project just like when the archetype is local:
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.14.0-SNAPSHOT \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Generating the Beam examples project from RELEASED versions of Beam
Caution: The released Beam versions don’t contain the Jet Runner yet. The SNAPSHOT version has to be used until Jet is released.
Running WordCount on a Local Jet Cluster
Issue following command in the Beam examples project to start new Jet cluster and run the WordCount example on it.
$ mvn package exec:java \ -DskipTests \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="\ --runner=JetRunner \ --jetLocalMode=3 \ --inputFile=pom.xml \ --output=counts" \ -Pjet-runner
Running WordCount on a Remote Jet Cluster
Download latest stable Hazelcast Jet code from Hazelcast Website and
start Jet cluster.
The simplest way is to start Jet cluster member using the
jet-start script that comes with Jet distribution.
The members use the auto discovery feature
to form a cluster. Let’s start up a cluster formed by two members:
$ cd hazelcast-jet $ bin/jet-start.sh & $ bin/jet-start.sh &
Check the cluster is up and running:
$ ./jet.sh cluster
You should see something like:
State: ACTIVE Version: 3.0 Size: 2 ADDRESS UUID [192.168.0.117]:5701 76bea7ba-f032-4c25-ad04-bdef6782f481 [192.168.0.117]:5702 03ecfaa2-be16-41b6-b5cf-eea584d7fb86
Download Jet Management Center from the same location and use it to monitor your cluster and later executions.
Change directory to the Beam Examples project and issue following command to submit and execute your Pipeline on the remote Jet cluster. Make sure to distribute the input file (file with the words to be counted) to all machines where the cluster runs. The word count job won’t be able to read the data otherwise.
$ mvn package exec:java \ -DskipTests \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="\ --runner=JetRunner \ --jetServers=192.168.0.117:5701,192.168.0.117:5702 \ --codeJarPathname=target/word-count-beam-bundled-0.1.jar \ --inputFile=<INPUT_FILE_AVAILABLE_ON_ALL_CLUSTER_MEMBERS> \ --output=/tmp/counts" \ -Pjet-runner
Pipeline Options for the Jet Runner
||The pipeline runner to use. This option allows you to determine the pipeline runner at runtime.||Set to
||The name of the Hazelcast Group to join, in essence an ID of the Jet Cluster that will be used by the Runner. With groups it is possible to create multiple clusters where each cluster has its own group and doesn't interfere with other clusters.||
||List of the addresses of Jet Cluster members, needed when the Runner doesn't start its own Jet Cluster,
but makes use of an external, independently started one. Takes the form of a comma separated list of ip/hostname-port pairs,
||Also a property needed only when using external Jet Clusters, specifies the location of a fat jar
containing all the code that needs to run on the cluster (so at least the pipeline and the runner code). The value
is any string that is acceptad by
||Has no default value.|
||The number of Jet Cluster members that should be started locally by the Runner. If it's
||Local parallelism of Jet members, the number of processors of each vertex of the DAG that will be created on each Jet Cluster member.||
||Boolean flag specifying if Jet Processors for DoFns are allowed to be cooperative (ie. use green threads instead of dedicated OS ones). If set to true than all such Processors will be cooperative, except when they have no outputs (so they are assumed to be syncs).||