apache_beam.transforms.async_dofn module

class apache_beam.transforms.async_dofn.AsyncWrapper(sync_fn, parallelism=1, callback_frequency=5, max_items_to_buffer=None, max_wait_time=120)[source]

Bases: DoFn

Class that wraps a dofn and converts it from one which process elements synchronously to one which processes them asynchronously.

For synchronous dofns the default settings mean that many (100s) of elements will be processed in parallel and that processing an element will block all other work on that key. In addition runners are optimized for latencies less than a few seconds and longer operations can result in high retry rates. Async should be considered when the default parallelism is not correct and/or items are expected to take longer than a few seconds to process.

Wraps the sync_fn to create an asynchronous version.

Parameters:
  • sync_fn – The dofn to wrap. Must take (K, V) as input.

  • parallelism – The maximum number of elements to process in parallel per worker for this dofn. By default this is set to 1 as the most common case for async dofns are heavy CPU or GPU dofns where the dofn requires the a signficant fraction of the CPU/GPU.

  • callback_frequency – The frequency with which the runner will check for elements to commit. A short callback frequency will mean items are commited shortly after processing but cause additional work for the worker. A large callback frequency will result in slower commits but less busy work. The default of 5s will result in a maximum added latency of 5s while requiring relatively few resources. If your messages take significantly longer than 5s to process it is recommended to raise this.

  • max_items_to_buffer – We should ideally buffer enough to always be busy but not so much that the worker ooms. By default will be 2x the parallelism which should be good for most pipelines.

  • max_wait_time – The maximum amount of time an item should wait to be added to the buffer. Used for testing to ensure timeouts are met.

TIMER = TimerSpec(ts-timer)
TIMER_SET = ReadModifyWriteStateSpec(timer_set)
TO_PROCESS = BagStateSpec(to_process)
static reset_state()[source]
setup()[source]

Forwards to the wrapped dofn’s setup method.

teardown()[source]

Forwards to the wrapped dofn’s teardown method.

sync_fn_process(element, *args, **kwargs)[source]

Makes the call to the wrapped dofn’s start_bundle, process

methods. It will then combine the results into a single generator.

Parameters:
  • element – The element to process.

  • *args – Any additional arguments to pass to the wrapped dofn’s process method. Will be the same args that the async wrapper is called with.

  • **kwargs – Any additional keyword arguments to pass to the wrapped dofn’s process method. Will be the same kwargs that the async wrapper is called with.

Returns:

A generator of elements produced by the input element.

decrement_items_in_buffer(future)[source]
schedule_if_room(element, ignore_buffer=False, *args, **kwargs)[source]

Schedules an item to be processed asynchronously if there is room.

Parameters:
  • element – The element to process.

  • ignore_buffer – If true will ignore the buffer limit and schedule the item regardless of the buffer size. Used when an item needs to skip to the front such as retries.

  • *args – arguments that the wrapped dofn requires.

  • **kwargs – keyword arguments that the wrapped dofn requires.

Returns:

True if the item was scheduled False otherwise.

schedule_item(element, ignore_buffer=False, *args, **kwargs)[source]

Schedules an item to be processed asynchronously.

If the queue is full will block until room opens up.

After calling AsyncWrapper will hold a future pointing to the result of this processing

Parameters:
  • element – The element to process.

  • ignore_buffer – If true will ignore the buffer limit and schedule the item regardless of the buffer size. Used when an item needs to skip to the front such as retries.

  • *args – arguments that the wrapped dofn requires.

  • **kwargs – keyword arguments that the wrapped dofn requires.

next_time_to_fire()[source]
accepting_items()[source]
is_empty()[source]
process(element, timer=TimerParam(ts - timer), to_process=StateParam(to_process), *args, **kwargs)[source]

Add the elements to the list of items to be processed asynchronously.

Performs additional bookkeeping to maintain exactly once and set timers to commit item after it has finished processing.

Parameters:
  • element – The element to process.

  • timer – Callback timer that will commit elements.

  • to_process – State that keeps track of queued items for exactly once.

  • *args – arguments that the wrapped dofn requires.

  • **kwargs – keyword arguments that the wrapped dofn requires.

Returns:

An empty list. The elements will be output asynchronously.

commit_finished_items(to_process=StateParam(to_process), timer=TimerParam(ts - timer))[source]

Commits finished items and synchronizes local state with runner state.

Note timer firings are per key while local state contains messages for all keys. Only messages for the given key will be output/cleaned up.

Parameters:
  • to_process – State that keeps track of queued messagees for this key.

  • timer – Timer that initiated this commit and can be reset if not all items have finished..

Returns:

A list of elements that have finished processing for this key.

timer_callback(to_process=StateParam(to_process), timer=TimerParam(ts - timer))[source]

Helper method to commit finished items in response to timer firing.

Parameters:
  • to_process – State that keeps track of queued items for exactly once.

  • timer – Timer that initiated this commit and can be reset if not all items have finished.

Returns:

A generator of elements that have finished processing for this key.