Getting started from Apache Spark
If you already know Apache Spark, using Beam should be easy. The basic concepts are the same, and the APIs are similar as well.
Spark stores data Spark DataFrames for structured data, and in Resilient Distributed Datasets (RDD) for unstructured data. We are using RDDs for this guide.
A Spark RDD represents a collection of elements, while in Beam it’s called a Parallel Collection (PCollection). A PCollection in Beam does not have any ordering guarantees.
Likewise, a transform in Beam is called a Parallel Transform (PTransform).
Here are some examples of common operations and their equivalent between PySpark and Beam.
Here’s a simple example of a PySpark pipeline that takes the numbers from one to four, multiplies them by two, adds all the values together, and prints the result.
In Beam you pipe your data through the pipeline using the
data | beam.Map(...) instead of chaining
data.map(...), but they’re doing the same thing.
Here’s what an equivalent pipeline looks like in Beam.
ℹ️ Note that we called
Maptransform. That’s because we can only access the elements of a PCollection from within a PTransform. To inspect the data locally, you can use the InteractiveRunner
Another thing to note is that Beam pipelines are constructed lazily.
This means that when you pipe
| data you’re only declaring the
transformations and the order you want them to happen,
but the actual computation doesn’t happen.
The pipeline is run after the
with beam.Pipeline() as pipeline context has
ℹ️ When the
with beam.Pipeline() as pipelinecontext closes, it implicitly calls
pipeline.run()which triggers the computation to happen.
The pipeline is then sent to your runner of choice and it processes the data.
ℹ️ The pipeline can run locally with the DirectRunner, or in a distributed runner such as Flink, Spark, or Dataflow. The Spark runner is not related to PySpark.
A label can optionally be added to a transform using the
right shift operator
data | 'My description' >> beam.Map(...).
This serves both as comments and makes your pipeline easier to debug.
This is how the pipeline looks after adding labels.
Here’s a comparison on how to get started both in PySpark and Beam.
Here are the equivalents of some common transforms in both PySpark and Beam.
|Group by key|
|Reduce by key|
|Count by key|
ℹ️ To learn more about the transforms available in Beam, check the Python transform gallery.
Using calculated values
Since we are working in potentially distributed environments, we can’t guarantee that the results we’ve calculated are available at any given machine.
In PySpark, we can get a result from a collection of elements (RDD) by using
data.collect(), or other aggregations such as
count(), and more.
Here’s an example to scale numbers into a range between zero and one.
import pyspark sc = pyspark.SparkContext() values = sc.parallelize([1, 2, 3, 4]) min_value = values.reduce(min) max_value = values.reduce(max) # We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`. scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value)) # But to access `scaled_values`, we need to call `collect`. print(scaled_values.collect())
In Beam the results from all transforms result in a PCollection. We use side inputs to feed a PCollection into a transform and access its values.
Any transform that accepts a function, like
can take side inputs.
If we only need a single value, we can use
beam.pvalue.AsSingleton and access them as a Python value.
If we need multiple values, we can use
and access them as an
import apache_beam as beam with beam.Pipeline() as pipeline: values = pipeline | beam.Create([1, 2, 3, 4]) min_value = values | beam.CombineGlobally(min) max_value = values | beam.CombineGlobally(max) # To access `min_value` and `max_value`, we need to pass them as a side input. scaled_values = values | beam.Map( lambda x, minimum, maximum: (x - minimum) / (maximum - minimum), minimum=beam.pvalue.AsSingleton(min_value), maximum=beam.pvalue.AsSingleton(max_value), ) scaled_values | beam.Map(print)
ℹ️ In Beam we need to pass a side input explicitly, but we get the benefit that a reduction or aggregation does not have to fit into memory. Lazily computing side inputs also allows us to compute
valuesonly once, rather than for each distinct reduction (or requiring explicit caching of the RDD).
- Take a look at all the available transforms in the Python transform gallery.
- Learn how to read from and write to files in the Pipeline I/O section of the Programming guide
- Walk through additional WordCount examples in the WordCount Example Walkthrough.
- Take a self-paced tour through our Learning Resources.
- Dive in to some of our favorite Videos and Podcasts.
- Join the Beam users@ mailing list.
- If you’re interested in contributing to the Apache Beam codebase, see the Contribution Guide.
Please don’t hesitate to reach out if you encounter any issues!
Last updated on 2023/09/29
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!