Map

Pydoc Pydoc




Applies a simple 1-to-1 mapping function over each element in the collection.

Examples

In the following examples, we create a pipeline with a PCollection of produce with their icon, name, and duration. Then, we apply Map in multiple ways to transform every element in the PCollection.

Map accepts a function that returns a single element for every input element in the PCollection.

Example 1: Map with a predefined function

We use the function str.strip which takes a single str element and outputs a str. It strips the input element’s whitespaces, including newlines and tabs.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '   🍓Strawberry   \n',
          '   🥕Carrot   \n',
          '   🍆Eggplant   \n',
          '   🍅Tomato   \n',
          '   🥔Potato   \n',
      ])
      | 'Strip' >> beam.Map(str.strip)
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 2: Map with a function

We define a function strip_header_and_newline which strips any '#', ' ', and '\n' characters from each element.

import apache_beam as beam

def strip_header_and_newline(text):
  return text.strip('# \n')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip_header_and_newline)
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 3: Map with a lambda function

We can also use lambda functions to simplify Example 2.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 4: Map with multiple arguments

You can pass functions with multiple arguments to Map. They are passed as additional positional arguments or keyword arguments to the function.

In this example, strip takes text and chars as arguments.

import apache_beam as beam

def strip(text, chars=None):
  return text.strip(chars)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip, chars='# \n')
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 5: MapTuple for key-value pairs

If your PCollection consists of (key, value) pairs, you can use MapTuple to unpack them into different function arguments.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Format' >>
      beam.MapTuple(lambda icon, plant: '{}{}'.format(icon, plant))
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 6: Map with side inputs as singletons

If the PCollection has a single value, such as the average from another computation, passing the PCollection as a singleton accesses that value.

In this example, we pass a PCollection the value '# \n' as a singleton. We then use that value as the characters for the str.strip method.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  chars = pipeline | 'Create chars' >> beam.Create(['# \n'])

  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(
          lambda text,
          chars: text.strip(chars),
          chars=beam.pvalue.AsSingleton(chars),
      )
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Example 7: Map with side inputs as iterators

If the PCollection has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won’t fit into memory.

import apache_beam as beam

with beam.Pipeline() as pipeline:
  chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])

  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(
          lambda text,
          chars: text.strip(''.join(chars)),
          chars=beam.pvalue.AsIter(chars),
      )
      | beam.Map(print))

Output:

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato

Note: You can pass the PCollection as a list with beam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.

Example 8: Map with side inputs as dictionaries

If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection must fit into memory for this. If the PCollection won’t fit into memory, use beam.pvalue.AsIter(pcollection) instead.

import apache_beam as beam

def replace_duration(plant, durations):
  plant['duration'] = durations[plant['duration']]
  return plant

with beam.Pipeline() as pipeline:
  durations = pipeline | 'Durations' >> beam.Create([
      (0, 'annual'),
      (1, 'biennial'),
      (2, 'perennial'),
  ])

  plant_details = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 2
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 1
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 2
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 0
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 2
          },
      ])
      | 'Replace duration' >> beam.Map(
          replace_duration,
          durations=beam.pvalue.AsDict(durations),
      )
      | beam.Map(print))

Output:

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}
Pydoc Pydoc