Apache Beam WordCount Examples

The WordCount examples demonstrate how to set up a processing pipeline that can read text, tokenize the text lines into individual words, and perform a frequency count on each of those words. The Beam SDKs contain a series of these four successively more detailed WordCount examples that build on each other. The input text for all the examples is a set of Shakespeare’s texts.

Each WordCount example introduces different concepts in the Beam programming model. Begin by understanding MinimalWordCount, the simplest of the examples. Once you feel comfortable with the basic principles in building a pipeline, continue on to learn more concepts in the other examples.

MinimalWordCount example

MinimalWordCount demonstrates a simple pipeline that uses the Direct Runner to read from a text file, apply transforms to tokenize and count the words, and write the data to an output text file.

This example hard-codes the locations for its input and output files and doesn’t perform any error checking; it is intended to only show you the “bare bones” of creating a Beam pipeline. This lack of parameterization makes this particular pipeline less portable across different runners than standard Beam pipelines. In later examples, we will parameterize the pipeline’s input and output sources and show other best practices.

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
$ go install github.com/apache/beam/sdks/v2/go/examples/minimal_wordcount
$ minimal_wordcount

To view the full code in Java, see MinimalWordCount.

To view the full code in Python, see wordcount_minimal.py.

To view the full code in Go, see minimal_wordcount.go.

Key Concepts:

The following sections explain these concepts in detail, using the relevant code excerpts from the MinimalWordCount pipeline.

Creating the pipeline

In this example, the code first creates a PipelineOptions object. This object lets us set various options for our pipeline, such as the pipeline runner that will execute our pipeline and any runner-specific configuration required by the chosen runner. In this example we set these options programmatically, but more often, command-line arguments are used to set PipelineOptions.

You can specify a runner for executing your pipeline, such as the DataflowRunner or SparkRunner. If you omit specifying a runner, as in this example, your pipeline executes locally using the DirectRunner. In the next sections, we will specify the pipeline’s runner.

 // Create a PipelineOptions object. This object lets us set various execution
 // options for our pipeline, such as the runner you wish to use. This example
 // will run with the DirectRunner by default, based on the class path configured
 // in its dependencies.
 PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
)

The next step is to create a Pipeline object with the options we’ve just constructed. The Pipeline object builds up the graph of transformations to be executed, associated with that particular pipeline.

The first step is to create a Pipeline object. It builds up the graph of transformations to be executed, associated with that particular pipeline. The scope allows grouping into composite transforms.

Pipeline p = Pipeline.create(options);
pipeline = beam.Pipeline(options=beam_options)
p := beam.NewPipeline()
s := p.Root()

Applying pipeline transforms

The MinimalWordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. Transforms can consist of an individual operation, or can contain multiple nested transforms (which is a composite transform).

Each transform takes some kind of input data and produces some output data. The input and output data is often represented by the SDK class PCollection. PCollection is a special class, provided by the Beam SDK, that you can use to represent a dataset of virtually any size, including unbounded datasets.

The MinimalWordCount pipeline data flow.

Figure 1: The MinimalWordCount pipeline data flow.

The MinimalWordCount pipeline contains five transforms:

  1. A text file Read transform is applied to the Pipeline object itself, and produces a PCollection as output. Each element in the output PCollection represents one line of text from the input file. This example uses input data stored in a publicly accessible Google Cloud Storage bucket (“gs://”).
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
pipeline
| beam.io.ReadFromText(input_file)
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/*")
  1. This transform splits the lines in PCollection<String>, where each element is an individual word in Shakespeare’s collected texts. As an alternative, it would have been possible to use a ParDo transform that invokes a DoFn (defined in-line as an anonymous class) on each element that tokenizes the text lines into individual words. The input for this transform is the PCollection of text lines generated by the previous TextIO.Read transform. The ParDo transform outputs a new PCollection, where each element represents an individual word in the text.
    .apply("ExtractWords", FlatMapElements
        .into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
# The Flatmap transform is a simplified version of ParDo.

| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)
  1. The SDK-provided Count transform is a generic transform that takes a PCollection of any type, and returns a PCollection of key/value pairs. Each key represents a unique element from the input collection, and each value represents the number of times that key appeared in the input collection.

    In this pipeline, the input for Count is the PCollection of individual words generated by the previous ParDo, and the output is a PCollection of key/value pairs where each key represents a unique word in the text and the associated value is the occurrence count for each.

.apply(Count.<String>perElement())
| beam.combiners.Count.PerElement()
counted := stats.Count(s, words)
  1. The next transform formats each of the key/value pairs of unique words and occurrence counts into a printable string suitable for writing to an output file.

    The map transform is a higher-level composite transform that encapsulates a simple ParDo. For each element in the input PCollection, the map transform applies a function that produces exactly one output element.

.apply("FormatResults", MapElements
    .into(TypeDescriptors.strings())
    .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)
  1. A text file write transform. This transform takes the final PCollection of formatted Strings as input and writes each element to an output text file. Each element in the input PCollection represents one line of text in the resulting output file.
.apply(TextIO.write().to("wordcounts"));
| beam.io.WriteToText(output_path)
textio.Write(s, "wordcounts.txt", formatted)

Note that the Write transform produces a trivial result value of type PDone, which in this case is ignored.

Note that the Write transform returns no PCollections.

Running the pipeline

Run the pipeline by calling the run method, which sends your pipeline to be executed by the pipeline runner that you specified in your PipelineOptions.

Run the pipeline by passing it to a runner.

p.run().waitUntilFinish();
with beam.Pipeline(...) as p:
  [construction]
# p.run() automatically called
direct.Execute(context.Background(), p)

Note that the run method is asynchronous. For a blocking execution, call the waitUntilFinish wait_until_finish method on the result object returned by the call to run.

Try the full example in Playground

WordCount example

This WordCount example introduces a few recommended programming practices that can make your pipeline easier to read, write, and maintain. While not explicitly required, they can make your pipeline’s execution more flexible, aid in testing your pipeline, and help make your pipeline’s code reusable.

This section assumes that you have a good understanding of the basic concepts in building a pipeline. If you feel that you aren’t at that point yet, read the above section, MinimalWordCount.

To run this example in Java:

Set up your development environment and generate the Maven archetype as described in the Java WordCount quickstart. Then run the pipeline with one of the runners:

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                  --project=YOUR_PROJECT --region=GCE_REGION \
                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

To view the full code in Java, see WordCount.

To run this example in Python:

python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
# Running Beam Python on a distributed Flink cluster requires additional configuration.
# See /documentation/runners/flink/ for more information.
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
                                         --output /path/to/write/counts \
                                         --runner SparkRunner
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --region YOUR_GCP_REGION \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

To view the full code in Python, see wordcount.py.

To run this example in Go:

$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
$ wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --region your-gcp-region \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache/beam_go_sdk:latest
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

To view the full code in Go, see wordcount.go.

New Concepts:

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

Specifying explicit DoFns

When using ParDo transforms, you need to specify the processing operation that gets applied to each element in the input PCollection. This processing operation is a subclass of the SDK class DoFn. You can create the DoFn subclasses for each ParDo inline, as an anonymous inner class instance, as is done in the previous example (MinimalWordCount). However, it’s often a good idea to define the DoFn at the global level, which makes it easier to unit test and can make the ParDo code more readable.

When using ParDo transforms, you need to specify the processing operation that gets applied to each element in the input PCollection. This processing operation is either a named function or a struct with specially-named methods. You can use anonymous functions (but not closures). However, it’s often a good idea to define the DoFn at the global level, which makes it easier to unit test and can make the ParDo code more readable.

// In this example, ExtractWordsFn is a DoFn that is defined as a static class:

static class ExtractWordsFn extends DoFn<String, String> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
        ...
    }
}
# In this example, the DoFns are defined as classes:


class FormatAsTextFn(beam.DoFn):
  def process(self, element):
    word, count = element
    yield '%s: %s' % (word, count)

formatted = counts | beam.ParDo(FormatAsTextFn())
// In this example, extractFn is a DoFn that is defined as a function:

func extractFn(ctx context.Context, line string, emit func(string)) {
   ...
}

Creating composite transforms

If you have a processing operation that consists of multiple transforms or ParDo steps, you can create it as a subclass of PTransform. Creating a PTransform subclass allows you to encapsulate complex transforms, can make your pipeline’s structure more clear and modular, and makes unit testing easier.

If you have a processing operation that consists of multiple transforms or ParDo steps, you can use a normal Go function to encapsulate them. You can furthermore use a named subscope to group them as a composite transform visible for monitoring.

In this example, two transforms are encapsulated as the PTransform subclass CountWords. CountWords contains the ParDo that runs ExtractWordsFn and the SDK-provided Count transform.

In this example, two transforms are encapsulated as a CountWords function.

When CountWords is defined, we specify its ultimate input and output; the input is the PCollection<String> for the extraction operation, and the output is the PCollection<KV<String, Long>> produced by the count operation.

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >>
      beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement())

counts = lines | CountWords()
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
	s = s.Scope("CountWords")

	// Convert lines of text into individual words.
	col := beam.ParDo(s, extractFn, lines)

	// Count the number of times each word occurs.
	return stats.Count(s, col)
}

Using parameterizable PipelineOptions

You can hard-code various execution options when you run your pipeline. However, the more common way is to define your own configuration options via command-line argument parsing. Defining your configuration options via the command-line makes the code more easily portable across different runners.

Add arguments to be processed by the command-line parser, and specify default values for them. You can then access the options values in your pipeline code.

You can use the standard flag package for this purpose.

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = pipeline | beam.io.ReadFromText(args.input_file)
var input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")

func main() {
    ...
    p := beam.NewPipeline()
    s := p.Root()

    lines := textio.Read(s, *input)
    ...

Try the full example in Playground

DebuggingWordCount example

The DebuggingWordCount example demonstrates some best practices for instrumenting your pipeline code.

To run this example in Java:

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=JetRunner --jetLocalMode=3 --output=counts

To view the full code in Java, see DebuggingWordCount.

To run this example in Python:

python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

To view the full code in Python, see wordcount_debugging.py.

To run this example in Go:

$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
$ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                      --output gs://<your-gcs-bucket>/counts \
                      --runner dataflow \
                      --project your-gcp-project \
                      --region your-gcp-region \
                      --temp_location gs://<your-gcs-bucket>/tmp/ \
                      --staging_location gs://<your-gcs-bucket>/binaries/ \
                      --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

To view the full code in Go, see debugging_wordcount.go.

New Concepts:

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

Logging

Each runner may choose to handle logs in its own way.

// This example uses .trace and .debug:

public class DebuggingWordCount {

  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (...) {
        ...
        LOG.debug("Matched: " + c.element().getKey());
      } else {
        ...
        LOG.trace("Did not match: " + c.element().getKey());
      }
    }
  }
}
# [START example_wordcount_debugging_aggregators]
import logging

class FilterTextFn(beam.DoFn):
  """A DoFn that filters for a specific key based on a regular expression."""
  def __init__(self, pattern):
    self.pattern = pattern
    # A custom metric can track values in your pipeline as it runs. Create
    # custom metrics matched_word and unmatched_words.
    self.matched_words = Metrics.counter(self.__class__, 'matched_words')
    self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')

  def process(self, element):
    word, _ = element
    if re.match(self.pattern, word):
      # Log at INFO level each element we match. When executing this pipeline
      # using the Dataflow service, these log lines will appear in the Cloud
      # Logging UI.
      logging.info('Matched %s', word)

      # Add 1 to the custom metric counter matched_words
      self.matched_words.inc()
      yield element
    else:
      # Log at the "DEBUG" level each element that is not matched. Different
      # log levels can be used to control the verbosity of logging providing
      # an effective mechanism to filter less important information. Note
      # currently only "INFO" and higher level logs are emitted to the Cloud
      # Logger. This log message will not be visible in the Cloud Logger.
      logging.debug('Did not match %s', word)

      # Add 1 to the custom metric counter umatched_words
      self.umatched_words.inc()
type filterFn struct {
    ...
}

func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
    if f.re.MatchString(word) {
         // Log at the "INFO" level each element that we match.
         log.Infof(ctx, "Matched: %v", word)
         emit(word, count)
    } else {
        // Log at the "DEBUG" level each element that is not matched.
        log.Debugf(ctx, "Did not match: %v", word)
    }
}

Direct Runner

When executing your pipeline with the DirectRunner, you can print log messages directly to your local console. If you use the Beam SDK for Java, you must add Slf4j to your class path.

Cloud Dataflow Runner

When executing your pipeline with the DataflowRunner, you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow job’s workers to a single location in the Google Cloud Platform Console. You can use Stackdriver Logging to search and access the logs from all of the workers that Cloud Dataflow has spun up to complete your job. Logging statements in your pipeline’s DoFn instances will appear in Stackdriver Logging as your pipeline runs.

You can also control the worker log levels. Cloud Dataflow workers that execute user code are configured to log to Stackdriver Logging by default at “INFO” log level and higher. You can override log levels for specific logging namespaces by specifying: --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}. For example, by specifying --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} when executing a pipeline using the Cloud Dataflow service, Stackdriver Logging will contain only “DEBUG” or higher level logs for the package in addition to the default “INFO” or higher level logs.

The default Cloud Dataflow worker logging configuration can be overridden by specifying --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>. For example, by specifying --defaultWorkerLogLevel=DEBUG when executing a pipeline with the Cloud Dataflow service, Cloud Logging will contain all “DEBUG” or higher level logs. Note that changing the default worker log level to TRACE or DEBUG significantly increases the amount of logs output.

Apache Spark Runner

Note: This section is yet to be added. There is an open issue for this (Issue 18076).

Note: This section is yet to be added. There is an open issue for this (Issue 18075).

Apache Nemo Runner

When executing your pipeline with the NemoRunner, most log messages are printed directly to your local console. You should add Slf4j to your class path to make full use of the logs. In order to observe the logs on each of the driver and the executor sides, you should observe the folders created by Apache REEF. For example, when running your pipeline through the local runtime, a folder called REEF_LOCAL_RUNTIME will be created on your work directory, and the logs and the metric information can all be found under the directory.

Testing your pipeline with asserts

PAssertassert_that is a set of convenient PTransforms in the style of Hamcrest’s collection matchers that can be used when writing pipeline level tests to validate the contents of PCollections. Asserts are best used in unit tests with small datasets.

The passert package contains convenient PTransforms that can be used when writing pipeline level tests to validate the contents of PCollections. Asserts are best used in unit tests with small datasets.

The following example verifies that the set of filtered words matches our expected counts. The assert does not produce any output, and the pipeline only succeeds if all of the expectations are met.

The following example verifies that two collections contain the same values. The assert does not produce any output, and the pipeline only succeeds if all of the expectations are met.

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

with TestPipeline() as p:
  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
...
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")

See DebuggingWordCountTest for an example unit test.

Try the full example in Playground

WindowedWordCount example

The WindowedWordCount example counts words in text just as the previous examples did, but introduces several advanced concepts.

New Concepts:

The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections.

To run this example in Java:

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

To view the full code in Java, see WindowedWordCount.

To run this example in Python:

This pipeline writes its results to a BigQuery table --output_table parameter. using the format PROJECT:DATASET.TABLE or DATASET.TABLE.

python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
                                         --output_table PROJECT:DATASET.TABLE \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

To view the full code in Python, see windowed_wordcount.py.

To run this example in Go:

$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
$ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

To view the full code in Go, see windowed_wordcount.go.

Unbounded and bounded datasets

Beam allows you to create a single pipeline that can handle both bounded and unbounded datasets. If your dataset has a fixed number of elements, it is a bounded dataset and all of the data can be processed together. For bounded datasets, the question to ask is “Do I have all of the data?” If data continuously arrives (such as an endless stream of game scores in the Mobile gaming example, it is an unbounded dataset. An unbounded dataset is never available for processing at any one time, so the data must be processed using a streaming pipeline that runs continuously. The dataset will only be complete up to a certain point, so the question to ask is “Up until what point do I have all of the data?” Beam uses windowing to divide a continuously updating dataset into logical windows of finite size. If your input is unbounded, you must use a runner that supports streaming.

If your pipeline’s input is bounded, then all downstream PCollections will also be bounded. Similarly, if the input is unbounded, then all downstream PCollections of the pipeline will be unbounded, though separate branches may be independently bounded.

Recall that the input for this example is a set of Shakespeare’s texts, which is a finite set of data. Therefore, this example reads bounded data from a text file:

public static void main(String[] args) throws IOException {
    Options options = ...
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline
      .apply(TextIO.read().from(options.getInputFile()))
def main(arvg=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input-file',
                      dest='input_file',
                      default='/Users/home/words-example.txt')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)
  lines  = p | 'read' >> ReadFromText(known_args.input_file)
func main() {
   ...
   p := beam.NewPipeline()
   s := p.Root()

   lines := textio.Read(s, *input)
   ...
}

Adding timestamps to data

Each element in a PCollection has an associated timestamp. The timestamp for each element is initially assigned by the source that creates the PCollection. Some sources that create unbounded PCollections can assign each new element a timestamp that corresponds to when the element was read or added. You can manually assign or adjust timestamps with a DoFn; however, you can only move timestamps forward in time.

In this example the input is bounded. For the purpose of the example, the DoFn method named AddTimestampsFn (invoked by ParDo) will set a timestamp for each element in the PCollection.

.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
beam.Map(AddTimestampFn(min_timestamp, max_timestamp))
timestampedLines := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, lines)

Below is the code for AddTimestampFn, a DoFn invoked by ParDo, that sets the data element of the timestamp given the element itself. For example, if the elements were log lines, this ParDo could parse the time out of the log string and set it as the element’s timestamp. There are no timestamps inherent in the works of Shakespeare, so in this case we’ve made up random timestamps just to illustrate the concept. Each line of the input text will get a random associated timestamp sometime in a 2-hour period.

static class AddTimestampFn extends DoFn<String, String> {
  private final Instant minTimestamp;
  private final Instant maxTimestamp;

  AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
    this.minTimestamp = minTimestamp;
    this.maxTimestamp = maxTimestamp;
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    Instant randomTimestamp =
      new Instant(
          ThreadLocalRandom.current()
          .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

    /**
     * Concept #2: Set the data element with that timestamp.
     */
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}
class AddTimestampFn(beam.DoFn):

  def __init__(self, min_timestamp, max_timestamp):
     self.min_timestamp = min_timestamp
     self.max_timestamp = max_timestamp

  def process(self, element):
    return window.TimestampedValue(
       element,
       random.randint(self.min_timestamp, self.max_timestamp))
type addTimestampFn struct {
	Min beam.EventTime `json:"min"`
}

func (f *addTimestampFn) ProcessElement(x beam.X) (beam.EventTime, beam.X) {
	timestamp := f.Min.Add(time.Duration(rand.Int63n(2 * time.Hour.Nanoseconds())))
	return timestamp, x
}

Note that the use of the beam.X “type variable” allows the transform to be used for any type.

Windowing

Beam uses a concept called Windowing to subdivide a PCollection into bounded sets of elements. PTransforms that aggregate multiple elements process each PCollection as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).

The WindowedWordCount example applies fixed-time windowing, wherein each window represents a fixed time interval. The fixed window size for this example defaults to 1 minute (you can change this with a command-line option).

PCollection<String> windowedWords = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
windowed_words = input | beam.WindowInto(window.FixedWindows(60 * window_size_minutes))
windowedLines := beam.WindowInto(s, window.NewFixedWindows(time.Minute), timestampedLines)

Reusing PTransforms over windowed PCollections

You can reuse existing PTransforms that were created for manipulating simple PCollections over windowed PCollections as well.

PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
word_counts = windowed_words | CountWords()
counted := wordcount.CountWords(s, windowedLines)

Try the full example in Playground

StreamingWordCount example

The StreamingWordCount example is a streaming pipeline that reads Pub/Sub messages from a Pub/Sub subscription or topic, and performs a frequency count on the words in each message. Similar to WindowedWordCount, this example applies fixed-time windowing, wherein each window represents a fixed time interval. The fixed window size for this example is 15 seconds. The pipeline outputs the frequency count of the words seen in each 15 second window.

New Concepts:

To run this example in Java:

Note: StreamingWordCount is not yet available for the Java SDK.

To run this example in Python:

python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

To view the full code in Python, see streaming_wordcount.py.

To run this example in Go:

Note: StreamingWordCount is not yet available for the Go SDK. There is an open issue for this (Issue 18879).

Reading an unbounded dataset

This example uses an unbounded dataset as input. The code reads Pub/Sub messages from a Pub/Sub subscription or topic using beam.io.ReadFromPubSub.

  // This example is not currently available for the Beam SDK for Java.
  # Read from Pub/Sub into a PCollection.
  if known_args.input_subscription:
    data = p | beam.io.ReadFromPubSub(
        subscription=known_args.input_subscription)
  else:
    data = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  lines = data | 'DecodeString' >> beam.Map(lambda d: d.decode('utf-8'))
  // This example is not currently available for the Beam SDK for Go.

Writing unbounded results

When the input is unbounded, the same is true of the output PCollection. As such, you must make sure to choose an appropriate I/O for the results. Some I/Os support only bounded output, while others support both bounded and unbounded outputs.

This example uses an unbounded PCollection and streams the results to Google Pub/Sub. The code formats the results and writes them to a Pub/Sub topic using beam.io.WriteToPubSub.

  // This example is not currently available for the Beam SDK for Java.
  # Write to Pub/Sub
  _ = (output
    | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
    | beam.io.WriteToPubSub(known_args.output_topic))
  // This example is not currently available for the Beam SDK for Go.

Next Steps

Please don’t hesitate to reach out if you encounter any issues!