Getting started from Apache Spark
If you already know Apache Spark, using Beam should be easy. The basic concepts are the same, and the APIs are similar as well.
Spark stores data Spark DataFrames for structured data, and in Resilient Distributed Datasets (RDD) for unstructured data. We are using RDDs for this guide.
A Spark RDD represents a collection of elements, while in Beam it’s called a Parallel Collection (PCollection). A PCollection in Beam does not have any ordering guarantees.
Likewise, a transform in Beam is called a Parallel Transform (PTransform).
Here are some examples of common operations and their equivalent between PySpark and Beam.
Overview
Here’s a simple example of a PySpark pipeline that takes the numbers from one to four, multiplies them by two, adds all the values together, and prints the result.
In Beam you pipe your data through the pipeline using the
pipe operator |
like data | beam.Map(...)
instead of chaining
methods like data.map(...)
, but they’re doing the same thing.
Here’s what an equivalent pipeline looks like in Beam.
ℹ️ Note that we called
Map
transform. That’s because we can only access the elements of a PCollection from within a PTransform. To inspect the data locally, you can use the InteractiveRunner
Another thing to note is that Beam pipelines are constructed lazily.
This means that when you pipe |
data you’re only declaring the
transformations and the order you want them to happen,
but the actual computation doesn’t happen.
The pipeline is run after the with beam.Pipeline() as pipeline
context has
closed.
ℹ️ When the
with beam.Pipeline() as pipeline
context closes, it implicitly callspipeline.run()
which triggers the computation to happen.
The pipeline is then sent to your runner of choice and it processes the data.
ℹ️ The pipeline can run locally with the DirectRunner, or in a distributed runner such as Flink, Spark, or Dataflow. The Spark runner is not related to PySpark.
A label can optionally be added to a transform using the
right shift operator >>
like data | 'My description' >> beam.Map(...)
.
This serves both as comments and makes your pipeline easier to debug.
This is how the pipeline looks after adding labels.
Setup
Here’s a comparison on how to get started both in PySpark and Beam.
PySpark | Beam | |
---|---|---|
Install | $ pip install pyspark | $ pip install apache-beam |
Imports | import pyspark | import apache_beam as beam |
Creating a local pipeline | sc = pyspark.SparkContext() as sc: # Your pipeline code here. | with beam.Pipeline() as pipeline: # Your pipeline code here. |
Creating values | values = sc.parallelize([1, 2, 3, 4]) | values = pipeline | beam.Create([1, 2, 3, 4]) |
Creating key-value pairs | pairs = sc.parallelize([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) | pairs = pipeline | beam.Create([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) |
Running a local pipeline | $ spark-submit spark_pipeline.py | $ python beam_pipeline.py |
Transforms
Here are the equivalents of some common transforms in both PySpark and Beam.
PySpark | Beam | |
---|---|---|
Map | values.map(lambda x: x * 2) | values | beam.Map(lambda x: x * 2) |
Filter | values.filter(lambda x: x % 2 == 0) | values | beam.Filter(lambda x: x % 2 == 0) |
FlatMap | values.flatMap(lambda x: range(x)) | values | beam.FlatMap(lambda x: range(x)) |
Group by key | pairs.groupByKey() | pairs | beam.GroupByKey() |
Reduce | values.reduce(lambda x, y: x+y) | values | beam.CombineGlobally(sum) |
Reduce by key | pairs.reduceByKey(lambda x, y: x+y) | pairs | beam.CombinePerKey(sum) |
Distinct | values.distinct() | values | beam.Distinct() |
Count | values.count() | values | beam.combiners.Count.Globally() |
Count by key | pairs.countByKey() | pairs | beam.combiners.Count.PerKey() |
Take smallest | values.takeOrdered(3) | values | beam.combiners.Top.Smallest(3) |
Take largest | values.takeOrdered(3, lambda x: -x) | values | beam.combiners.Top.Largest(3) |
Random sample | values.takeSample(False, 3) | values | beam.combiners.Sample.FixedSizeGlobally(3) |
Union | values.union(otherValues) | (values, otherValues) | beam.Flatten() |
Co-group | pairs.cogroup(otherPairs) | {'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey() |
ℹ️ To learn more about the transforms available in Beam, check the Python transform gallery.
Using calculated values
Since we are working in potentially distributed environments, we can’t guarantee that the results we’ve calculated are available at any given machine.
In PySpark, we can get a result from a collection of elements (RDD) by using
data.collect()
, or other aggregations such as reduce()
, count()
, and more.
Here’s an example to scale numbers into a range between zero and one.
import pyspark
sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)
# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))
# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())
In Beam the results from all transforms result in a PCollection. We use side inputs to feed a PCollection into a transform and access its values.
Any transform that accepts a function, like
Map
,
can take side inputs.
If we only need a single value, we can use
beam.pvalue.AsSingleton
and access them as a Python value.
If we need multiple values, we can use
beam.pvalue.AsIter
and access them as an iterable
.
import apache_beam as beam
with beam.Pipeline() as pipeline:
values = pipeline | beam.Create([1, 2, 3, 4])
min_value = values | beam.CombineGlobally(min)
max_value = values | beam.CombineGlobally(max)
# To access `min_value` and `max_value`, we need to pass them as a side input.
scaled_values = values | beam.Map(
lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
minimum=beam.pvalue.AsSingleton(min_value),
maximum=beam.pvalue.AsSingleton(max_value),
)
scaled_values | beam.Map(print)
ℹ️ In Beam we need to pass a side input explicitly, but we get the benefit that a reduction or aggregation does not have to fit into memory. Lazily computing side inputs also allows us to compute
values
only once, rather than for each distinct reduction (or requiring explicit caching of the RDD).
Next Steps
- Take a look at all the available transforms in the Python transform gallery.
- Learn how to read from and write to files in the Pipeline I/O section of the Programming guide
- Walk through additional WordCount examples in the WordCount Example Walkthrough.
- Take a self-paced tour through our Learning Resources.
- Dive in to some of our favorite Videos and Podcasts.
- Join the Beam users@ mailing list.
- If you’re interested in contributing to the Apache Beam codebase, see the Contribution Guide.
Please don’t hesitate to reach out if you encounter any issues!
Last updated on 2024/10/15
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!