Shared class for managing a single instance of an object shared by multiple threads within the same process. Shared is a serializable object that can be shared by all threads of each worker process, which will be initialized as necessary by calls to acquire.
To share a very large list across all threads of each worker in a DoFn:
# Shared is a helper class for managing a single instance of an object # shared by multiple threads within the same process. Instances of Shared # are serializable objects that can be shared by all threads of each worker # process. A Shared object encapsulates a weak reference to a singleton # instance of the shared resource. The singleton is lazily initialized by # calls to Shared.acquire(). # # Several built-in types such as list and dict do not directly support weak # references but can add support through subclassing: # https://docs.python.org/3/library/weakref.html class WeakRefList(list): pass class GetNthStringFn(beam.DoFn): def __init__(self, shared_handle): self._shared_handle = shared_handle def setup(self): # setup is a good place to initialize transient in-memory resources. def initialize_list(): # Build the giant initial list. return WeakRefList([str(i) for i in range(1000000)]) self._giant_list = self._shared_handle.acquire(initialize_list) def process(self, element): yield self._giant_list[element] p = beam.Pipeline() shared_handle = shared.Shared() (p | beam.Create([2, 4, 6, 8]) | beam.ParDo(GetNthStringFn(shared_handle)))
Real-world uses will typically involve using a side-input to a DoFn to initialize the shared resource in a way that can’t be done with just its constructor:
class RainbowTableLookupFn(beam.DoFn): def __init__(self, shared_handle): self._shared_handle = shared_handle def process(self, element, table_elements): def construct_table(): # Construct the rainbow table from the table elements. # The table contains lines in the form "string::hash" result = dict() for key, value in table_elements: result[value] = key return result rainbow_table = self._shared_handle.acquire(construct_table) unhashed_str = rainbow_table.get(element) if unhashed_str is not None: yield unhashed_str p = beam.Pipeline() shared_handle = shared.Shared() reverse_hash_table = p | "ReverseHashTable" >> beam.Create([ ('a', '0cc175b9c0f1b6a831c399e269772661'), ('b', '92eb5ffee6ae2fec3ad71c777531578f'), ('c', '4a8a08f09d37b73795649038408b5f33'), ('d', '8277e0910d750195b448797616e091ad')]) unhashed = (p | 'Hashes' >> beam.Create([ '0cc175b9c0f1b6a831c399e269772661', '8277e0910d750195b448797616e091ad']) | 'Unhash' >> beam.ParDo( RainbowTableLookupFn(shared_handle), reverse_hash_table))
Handle for managing shared per-process objects.
Each instance of a Shared object represents a distinct handle to a distinct object. Example usage is described in the file comment of shared.py.
This object has the following limitations: * A shared object won’t be GC’ed if there isn’t another acquire called for a different shared object. * Each stage can only use exactly one Shared token, otherwise only one Shared token, NOT NECESSARILY THE LATEST, will be “kept-alive”. * If there are two different stages using separate Shared tokens, but which get fused together, only one Shared token will be “kept-alive”.
(See documentation of _SharedMap for details.)
Acquire a reference to the object associated with this Shared handle.
- constructor_fn – function that initialises / constructs the object if not present in the cache. This function should take no arguments. It should return an initialised object, or None if the object could not be initialised / constructed.
- tag – an optional indentifier to store with the cached object. If subsequent calls to acquire use different tags, the object will be reloaded rather than returned from cache.
A reference to an initialised object, either from the cache, or newly-constructed.