Nexmark benchmark suite

What it is

Nexmark is a suite of pipelines inspired by the ‘continuous data stream’ queries in Nexmark research paper

These are multiple queries over a three entities model representing on online auction system:

The queries

The queries exercise many aspects of Beam model:

We have augmented the original queries with five more:

Benchmark workload configuration

Here are some of the knobs of the benchmark workload (see NexmarkConfiguration.java).

These configuration items can be passed to the launch command line.

Events generation (defaults)

Windows (defaults)

Events Proportions (defaults)

Technical

Nexmark output

Here is an example output of the Nexmark benchmark run in streaming mode with the SMOKE suite on the (local) direct runner:

Performance:
  Conf       Runtime(sec)         Events(/sec)         Results
  0000                5,5              18138,9          100000
  0001                4,2              23657,4           92000
  0002                2,2              45683,0             351
  0003                3,9              25348,5             444
  0004                1,6               6207,3              40
  0005                5,0              20173,5              12
  0006                0,9              11376,6             401
  0007              121,4                823,5               1
  0008                2,5              40273,9            6000
  0009                0,9              10695,2             298
  0010                4,0              25025,0               1
  0011                4,4              22655,2            1919
  0012                3,5              28208,7            1919

Benchmark launch configuration

We can specify the Beam runner to use with maven profiles, available profiles are:

direct-runner
spark-runner
flink-runner
apex-runner

The runner must also be specified like in any other Beam pipeline using:

--runner

Test data is deterministically synthesized on demand. The test data may be synthesized in the same pipeline as the query itself, or may be published to Pub/Sub.

The query results may be:

Common configuration parameters

Decide if batch or streaming:

--streaming=true

Number of events generators:

--numEventGenerators=4

Run query N:

--query=N

Available Suites

The suite to run can be chosen using this configuration parameter:

--suite=SUITE

Available suites are:

Apex runner specific configuration

--manageResources=false --monitorJobs=false

Google Cloud Dataflow runner specific configuration

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://talend-imejia/nexmark/temp/ \
--stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
--filesToStage=target/beam-sdks-java-nexmark-2.1.0-SNAPSHOT.jar

Direct runner specific configuration

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \
--flinkMaster=local --parallelism=#numcores

Spark runner specific configuration

--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true

Current status

These tables contain statuses of the queries runs in the different runners. Google Cloud Dataflow and Apache Gearpump statuses are yet to come.

Batch / Synthetic / Local

Query Direct Spark Flink Apex
0 ok ok ok ok
1 ok ok ok ok
2 ok ok ok ok
3 ok BEAM-1115 ok BEAM-1114
4 ok ok ok ok
5 ok ok ok ok
6 ok ok ok ok
7 ok ok ok ok
8 ok ok ok ok
9 ok ok ok ok
10 ok ok ok ok
11 ok ok ok ok
12 ok ok ok ok

Streaming / Synthetic / Local

Query Direct Spark BEAM-2847 Flink Apex
0 ok ok ok ok
1 ok ok ok ok
2 ok ok ok ok
3 ok BEAM-1035, BEAM-1115 ok BEAM-1114
4 ok ok ok ok
5 ok ok ok ok
6 ok ok ok ok
7 ok BEAM-2112 ok ok
8 ok ok ok ok
9 ok ok ok ok
10 ok ok ok ok
11 ok ok ok ok
12 ok ok ok ok

Batch / Synthetic / Cluster

Yet to come

Streaming / Synthetic / Cluster

Yet to come

Running Nexmark

Running SMOKE suite on the DirectRunner (local)

Batch Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"

Streaming Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"

Running SMOKE suite on the SparkRunner (local)

Batch Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"

Streaming Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"

Running SMOKE suite on the FlinkRunner (local)

Batch Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true  --flinkMaster=local"

Streaming Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true  --flinkMaster=local"

Running SMOKE suite on the ApexRunner (local)

Batch Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"

Streaming Mode:

mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"

Running SMOKE suite on Google Cloud Dataflow

Building package:

mvn clean package -Pdataflow-runner

Submit to Google Dataflow service:

java -cp sdks/java/nexmark/target/beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
  org.apache.beam.sdk.nexmark.Main \
  --runner=DataflowRunner
  --project=<your project> \
  --zone=<your zone> \
  --workerMachineType=n1-highmem-8 \
  --stagingLocation=<a gs path for staging> \
  --streaming=true \
  --sourceType=PUBSUB \
  --pubSubMode=PUBLISH_ONLY \
  --pubsubTopic=<an existing Pubsub topic> \
  --resourceNameMode=VERBATIM \
  --manageResources=false \
  --monitorJobs=false \
  --numEventGenerators=64 \
  --numWorkers=16 \
  --maxNumWorkers=16 \
  --suite=SMOKE \
  --firstEventRate=100000 \
  --nextEventRate=100000 \
  --ratePeriodSec=3600 \
  --isRateLimited=true \
  --avgPersonByteSize=500 \
  --avgAuctionByteSize=500 \
  --avgBidByteSize=500 \
  --probDelayedEvent=0.000001 \
  --occasionalDelaySec=3600 \
  --numEvents=0 \
  --useWallclockEventTime=true \
  --usePubsubPublishTime=true \
  --experiments=enable_custom_pubsub_sink
java -cp sdks/java/nexmark/target/beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
  org.apache.beam.sdk.nexmark.Main \
  --runner=DataflowRunner
  --project=<your project> \
  --zone=<your zone> \
  --workerMachineType=n1-highmem-8 \
  --stagingLocation=<a gs path for staging> \
  --streaming=true \
  --sourceType=PUBSUB \
  --pubSubMode=SUBSCRIBE_ONLY \
  --pubsubSubscription=<an existing Pubsub subscription to above topic> \
  --resourceNameMode=VERBATIM \
  --manageResources=false \
  --monitorJobs=false \
  --numWorkers=64 \
  --maxNumWorkers=64 \
  --suite=SMOKE \
  --usePubsubPublishTime=true \
  --outputPath=<a gs path under which log files will be written> \
  --windowSizeSec=600 \
  --occasionalDelaySec=3600 \
  --maxLogEvents=10000 \
  --experiments=enable_custom_pubsub_source

Running query 0 on a Spark cluster with Apache Hadoop YARN

Building package:

mvn clean package -Pspark-runner

Submit to the cluster:

spark-submit --master yarn-client --class org.apache.beam.sdk.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true