FlatMap

Pydoc Pydoc




Applies a simple 1-to-many mapping function over each element in the collection. The many elements are flattened into the resulting collection.

Examples

In the following examples, we create a pipeline with a PCollection of produce with their icon, name, and duration. Then, we apply FlatMap in multiple ways to yield zero or more elements per each input element into the resulting PCollection.

FlatMap accepts a function that returns an iterable, where each of the output iterable’s elements is an element of the resulting PCollection.

Example 1: FlatMap with a predefined function

We use the function str.split which takes a single str element and outputs a list of strs. This pipeline splits the input element using whitespaces, creating a list of zero or more elements.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry 🥕Carrot 🍆Eggplant',
          '🍅Tomato 🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(str.split)
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 2: FlatMap with a function

We define a function split_words which splits an input str element using the delimiter ',' and outputs a list of strs.

import apache_beam as beam

def split_words(text):
  return text.split(',')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(split_words)
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 3: FlatMap with a lambda function

For this example, we want to flatten a PCollection of lists of strs into a PCollection of strs. Each input element is already an iterable, where each element is what we want in the resulting PCollection. We use a lambda function that returns the same input element it received.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
          ['🍅Tomato', '🥔Potato'],
      ])
      | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 4: FlatMap with a generator

For this example, we want to flatten a PCollection of lists of strs into a PCollection of strs. We use a generator to iterate over the input list and yield each of the elements. Each yielded result in the generator is an element in the resulting PCollection.

import apache_beam as beam

def generate_elements(elements):
  for element in elements:
    yield element

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
          ['🍅Tomato', '🥔Potato'],
      ])
      | 'Flatten lists' >> beam.FlatMap(generate_elements)
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 5: FlatMapTuple for key-value pairs

If your PCollection consists of (key, value) pairs, you can use FlatMapTuple to unpack them into different function arguments.

import apache_beam as beam

def format_plant(icon, plant):
  if icon:
    yield '{}{}'.format(icon, plant)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
          (None, 'Invalid'),
      ])
      | 'Format' >> beam.FlatMapTuple(format_plant)
      | beam.Map(print)
  )

Output PCollection after FlatMapTuple:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 6: FlatMap with multiple arguments

You can pass functions with multiple arguments to FlatMap. They are passed as additional positional arguments or keyword arguments to the function.

In this example, split_words takes text and delimiter as arguments.

import apache_beam as beam

def split_words(text, delimiter=None):
  return text.split(delimiter)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(split_words, delimiter=',')
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 7: FlatMap with side inputs as singletons

If the PCollection has a single value, such as the average from another computation, passing the PCollection as a singleton accesses that value.

In this example, we pass a PCollection the value ',' as a singleton. We then use that value as the delimiter for the str.split method.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  delimiter = pipeline | 'Create delimiter' >> beam.Create([','])

  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(
          lambda text, delimiter: text.split(delimiter),
          delimiter=beam.pvalue.AsSingleton(delimiter),
      )
      | beam.Map(print)
  )

Output PCollection after FlatMap:

plants = [
    '🍓Strawberry',
    '🥕Carrot',
    '🍆Eggplant',
    '🍅Tomato',
    '🥔Potato',
]
Run code now Run code now
View source code View source code




Example 8: FlatMap with side inputs as iterators

If the PCollection has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won’t fit into memory.

import apache_beam as beam

def normalize_and_validate_durations(plant, valid_durations):
  plant['duration'] = plant['duration'].lower()
  if plant['duration'] in valid_durations:
    yield plant

with beam.Pipeline() as pipeline:
  valid_durations = pipeline | 'Valid durations' >> beam.Create([
      'annual',
      'biennial',
      'perennial',
  ])

  valid_plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'},
      ])
      | 'Normalize and validate durations' >> beam.FlatMap(
          normalize_and_validate_durations,
          valid_durations=beam.pvalue.AsIter(valid_durations),
      )
      | beam.Map(print)
  )

Output PCollection after FlatMap:

valid_plants = [
    {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
    {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
    {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
    {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
]
Run code now Run code now
View source code View source code




Note: You can pass the PCollection as a list with beam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.

Example 9: FlatMap with side inputs as dictionaries

If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection must fit into memory for this. If the PCollection won’t fit into memory, use beam.pvalue.AsIter(pcollection) instead.

import apache_beam as beam

def replace_duration_if_valid(plant, durations):
  if plant['duration'] in durations:
    plant['duration'] = durations[plant['duration']]
    yield plant

with beam.Pipeline() as pipeline:
  durations = pipeline | 'Durations dict' >> beam.Create([
      (0, 'annual'),
      (1, 'biennial'),
      (2, 'perennial'),
  ])

  valid_plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 1},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 0},
          {'icon': '🥔', 'name': 'Potato', 'duration': -1},
      ])
      | 'Replace duration if valid' >> beam.FlatMap(
          replace_duration_if_valid,
          durations=beam.pvalue.AsDict(durations),
      )
      | beam.Map(print)
  )

Output PCollection after FlatMap:

valid_plants = [
    {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
    {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
    {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
    {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
]
Run code now Run code now
View source code View source code




Pydoc Pydoc