Latest

Pydoc Pydoc




Gets the element with the latest timestamp.

Examples

In the following examples, we create a pipeline with a PCollection of produce with a timestamp for their harvest date.

We use Latest to get the element with the latest timestamp from the PCollection.

Example 1: Latest element globally

We use Latest.Globally() to get the element with the latest timestamp in the entire PCollection.

import apache_beam as beam
import time

def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
  return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
  latest_element = (
      pipeline
      | 'Create crops' >> beam.Create([
          {
              'item': 'πŸ₯¬', 'harvest': '2020-02-24 00:00:00'
          },
          {
              'item': 'πŸ“', 'harvest': '2020-06-16 00:00:00'
          },
          {
              'item': 'πŸ₯•', 'harvest': '2020-07-17 00:00:00'
          },
          {
              'item': 'πŸ†', 'harvest': '2020-10-26 00:00:00'
          },
          {
              'item': 'πŸ…', 'harvest': '2020-10-01 00:00:00'
          },
      ])
      | 'With timestamps' >> beam.Map(
          lambda crop: beam.window.TimestampedValue(
              crop['item'], to_unix_time(crop['harvest'])))
      | 'Get latest element' >> beam.combiners.Latest.Globally()
      | beam.Map(print))

Output:

πŸ†

Example 2: Latest elements for each key

We use Latest.PerKey() to get the elements with the latest timestamp for each key in a PCollection of key-values.

import apache_beam as beam
import time

def to_unix_time(time_str, format='%Y-%m-%d %H:%M:%S'):
  return time.mktime(time.strptime(time_str, format))

with beam.Pipeline() as pipeline:
  latest_elements_per_key = (
      pipeline
      | 'Create crops' >> beam.Create([
          ('spring', {
              'item': 'πŸ₯•', 'harvest': '2020-06-28 00:00:00'
          }),
          ('spring', {
              'item': 'πŸ“', 'harvest': '2020-06-16 00:00:00'
          }),
          ('summer', {
              'item': 'πŸ₯•', 'harvest': '2020-07-17 00:00:00'
          }),
          ('summer', {
              'item': 'πŸ“', 'harvest': '2020-08-26 00:00:00'
          }),
          ('summer', {
              'item': 'πŸ†', 'harvest': '2020-09-04 00:00:00'
          }),
          ('summer', {
              'item': 'πŸ₯¬', 'harvest': '2020-09-18 00:00:00'
          }),
          ('summer', {
              'item': 'πŸ…', 'harvest': '2020-09-22 00:00:00'
          }),
          ('autumn', {
              'item': 'πŸ…', 'harvest': '2020-10-01 00:00:00'
          }),
          ('autumn', {
              'item': 'πŸ₯¬', 'harvest': '2020-10-20 00:00:00'
          }),
          ('autumn', {
              'item': 'πŸ†', 'harvest': '2020-10-26 00:00:00'
          }),
          ('winter', {
              'item': 'πŸ₯¬', 'harvest': '2020-02-24 00:00:00'
          }),
      ])
      | 'With timestamps' >> beam.Map(
          lambda pair: beam.window.TimestampedValue(
              (pair[0], pair[1]['item']), to_unix_time(pair[1]['harvest'])))
      | 'Get latest elements per key' >> beam.combiners.Latest.PerKey()
      | beam.Map(print))

Output:

('spring', 'πŸ₯•')
('summer', 'πŸ…')
('autumn', 'πŸ†')
('winter', 'πŸ₯¬')
Pydoc Pydoc