Top
![]() |
Transforms for finding the largest (or smallest) set of elements in a collection, or the largest (or smallest) set of values associated with each key in a collection of key-value pairs.
Examples
In the following example, we create a pipeline with a PCollection
.
Then, we get the largest or smallest elements in different ways.
Example 1: Largest elements from a PCollection
We use Top.Largest()
to get the largest elements from the entire PCollection
.
Output:
Example 2: Largest elements for each key
We use Top.LargestPerKey()
to get the largest elements for each unique key in a PCollection
of key-values.
import apache_beam as beam
with beam.Pipeline() as pipeline:
largest_elements_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π
', 4),
('π
', 5),
('π
', 3),
])
| 'Largest N values per key' >> beam.combiners.Top.LargestPerKey(2)
| beam.Map(print))
Output:
Example 3: Smallest elements from a PCollection
We use Top.Smallest()
to get the smallest elements from the entire PCollection
.
Output:
Example 4: Smallest elements for each key
We use Top.SmallestPerKey()
to get the smallest elements for each unique key in a PCollection
of key-values.
import apache_beam as beam
with beam.Pipeline() as pipeline:
smallest_elements_per_key = (
pipeline
| 'Create produce' >> beam.Create([
('π₯', 3),
('π₯', 2),
('π', 1),
('π
', 4),
('π
', 5),
('π
', 3),
])
| 'Smallest N values per key' >> beam.combiners.Top.SmallestPerKey(2)
| beam.Map(print))
Output:
Example 5: Custom elements from a PCollection
We use Top.Of()
to get elements with customized rules from the entire PCollection
.
You can change how the elements are compared with key
.
By default you get the largest elements, but you can get the smallest by setting reverse=True
.
import apache_beam as beam
with beam.Pipeline() as pipeline:
shortest_elements = (
pipeline
| 'Create produce names' >> beam.Create([
'π Strawberry',
'π₯ Carrot',
'π Green apple',
'π Eggplant',
'π½ Corn',
])
| 'Shortest names' >> beam.combiners.Top.Of(
2, # number of elements
key=len, # optional, defaults to the element itself
reverse=True, # optional, defaults to False (largest/descending)
)
| beam.Map(print)
)
Output:
Example 6: Custom elements for each key
We use Top.PerKey()
to get elements with customized rules for each unique key in a PCollection
of key-values.
You can change how the elements are compared with key
.
By default you get the largest elements, but you can get the smallest by setting reverse=True
.
import apache_beam as beam
with beam.Pipeline() as pipeline:
shortest_elements_per_key = (
pipeline
| 'Create produce names' >> beam.Create([
('spring', 'π₯ Carrot'),
('spring', 'π Strawberry'),
('summer', 'π₯ Carrot'),
('summer', 'π½ Corn'),
('summer', 'π Green apple'),
('fall', 'π₯ Carrot'),
('fall', 'π Green apple'),
('winter', 'π Eggplant'),
])
| 'Shortest names per key' >> beam.combiners.Top.PerKey(
2, # number of elements
key=len, # optional, defaults to the value itself
reverse=True, # optional, defaults to False (largest/descending)
)
| beam.Map(print)
)
Output:
Related transforms
- Sample to combine elements. Takes samples of the elements in a collection.
![]() |
Last updated on 2023/03/21
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!