Latest
![]() |
Gets the element with the latest timestamp.
Examples
In the following examples, we create a pipeline with a PCollection
of produce with a timestamp for their harvest date.
We use Latest
to get the element with the latest timestamp from the PCollection
.
Example 1: Latest element globally
We use Latest.Globally()
to get the element with the latest timestamp in the entire PCollection
.
import apache_beam as beam
import time
def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
return time.mktime(time.strptime(time_str, format))
with beam.Pipeline() as pipeline:
latest_element = (
pipeline
| 'Create crops' >> beam.Create([
{
'item': 'π₯¬', 'harvest': '2020-02-24 00:00:00'
},
{
'item': 'π', 'harvest': '2020-06-16 00:00:00'
},
{
'item': 'π₯', 'harvest': '2020-07-17 00:00:00'
},
{
'item': 'π', 'harvest': '2020-10-26 00:00:00'
},
{
'item': 'π
', 'harvest': '2020-10-01 00:00:00'
},
])
| 'With timestamps' >> beam.Map(
lambda crop: beam.window.TimestampedValue(
crop['item'], to_unix_time(crop['harvest'])))
| 'Get latest element' >> beam.combiners.Latest.Globally()
| beam.Map(print))
Output:
Example 2: Latest elements for each key
We use Latest.PerKey()
to get the elements with the latest timestamp for each key in a PCollection
of key-values.
import apache_beam as beam
import time
def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
return time.mktime(time.strptime(time_str, format))
with beam.Pipeline() as pipeline:
latest_elements_per_key = (
pipeline
| 'Create crops' >> beam.Create([
('spring', {
'item': 'π₯', 'harvest': '2020-06-28 00:00:00'
}),
('spring', {
'item': 'π', 'harvest': '2020-06-16 00:00:00'
}),
('summer', {
'item': 'π₯', 'harvest': '2020-07-17 00:00:00'
}),
('summer', {
'item': 'π', 'harvest': '2020-08-26 00:00:00'
}),
('summer', {
'item': 'π', 'harvest': '2020-09-04 00:00:00'
}),
('summer', {
'item': 'π₯¬', 'harvest': '2020-09-18 00:00:00'
}),
('summer', {
'item': 'π
', 'harvest': '2020-09-22 00:00:00'
}),
('autumn', {
'item': 'π
', 'harvest': '2020-10-01 00:00:00'
}),
('autumn', {
'item': 'π₯¬', 'harvest': '2020-10-20 00:00:00'
}),
('autumn', {
'item': 'π', 'harvest': '2020-10-26 00:00:00'
}),
('winter', {
'item': 'π₯¬', 'harvest': '2020-02-24 00:00:00'
}),
])
| 'With timestamps' >> beam.Map(
lambda pair: beam.window.TimestampedValue(
(pair[0], pair[1]['item']), to_unix_time(pair[1]['harvest'])))
| 'Get latest elements per key' >> beam.combiners.Latest.PerKey()
| beam.Map(print))
Output:
Related transforms
- Sample randomly takes some number of elements in a collection.
![]() |
Last updated on 2023/06/03
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!