apache_beam.utils.shared module¶
Shared class.
Shared class for managing a single instance of an object shared by multiple threads within the same process. Shared is a serializable object that can be shared by all threads of each worker process, which will be initialized as necessary by calls to acquire.
Example usage:
To share a very large list across all threads of each worker in a DoFn:
# Shared is a helper class for managing a single instance of an object
# shared by multiple threads within the same process. Instances of Shared
# are serializable objects that can be shared by all threads of each worker
# process. A Shared object encapsulates a weak reference to a singleton
# instance of the shared resource. The singleton is lazily initialized by
# calls to Shared.acquire().
#
# Several built-in types such as list and dict do not directly support weak
# references but can add support through subclassing:
# https://docs.python.org/3/library/weakref.html
class WeakRefList(list):
pass
class GetNthStringFn(beam.DoFn):
def __init__(self, shared_handle):
self._shared_handle = shared_handle
def setup(self):
# setup is a good place to initialize transient in-memory resources.
def initialize_list():
# Build the giant initial list.
return WeakRefList([str(i) for i in range(1000000)])
self._giant_list = self._shared_handle.acquire(initialize_list)
def process(self, element):
yield self._giant_list[element]
p = beam.Pipeline()
shared_handle = shared.Shared()
(p | beam.Create([2, 4, 6, 8])
| beam.ParDo(GetNthStringFn(shared_handle)))
Real-world uses will typically involve using a side-input to a DoFn to initialize the shared resource in a way that can’t be done with just its constructor:
class RainbowTableLookupFn(beam.DoFn):
def __init__(self, shared_handle):
self._shared_handle = shared_handle
def process(self, element, table_elements):
def construct_table():
# Construct the rainbow table from the table elements.
# The table contains lines in the form "string::hash"
result = {}
for key, value in table_elements:
result[value] = key
return result
rainbow_table = self._shared_handle.acquire(construct_table)
unhashed_str = rainbow_table.get(element)
if unhashed_str is not None:
yield unhashed_str
p = beam.Pipeline()
shared_handle = shared.Shared()
reverse_hash_table = p | "ReverseHashTable" >> beam.Create([
('a', '0cc175b9c0f1b6a831c399e269772661'),
('b', '92eb5ffee6ae2fec3ad71c777531578f'),
('c', '4a8a08f09d37b73795649038408b5f33'),
('d', '8277e0910d750195b448797616e091ad')])
unhashed = (p
| 'Hashes' >> beam.Create([
'0cc175b9c0f1b6a831c399e269772661',
'8277e0910d750195b448797616e091ad'])
| 'Unhash' >> beam.ParDo(
RainbowTableLookupFn(shared_handle), reverse_hash_table))
-
class
apache_beam.utils.shared.
Shared
[source]¶ Bases:
object
Handle for managing shared per-process objects.
Each instance of a Shared object represents a distinct handle to a distinct object. Example usage is described in the file comment of shared.py.
This object has the following limitations: * A shared object won’t be GC’ed if there isn’t another acquire called for a different shared object. * Each stage can only use exactly one Shared token, otherwise only one Shared token, NOT NECESSARILY THE LATEST, will be “kept-alive”. * If there are two different stages using separate Shared tokens, but which get fused together, only one Shared token will be “kept-alive”.
(See documentation of _SharedMap for details.)
-
acquire
(constructor_fn, tag=None)[source]¶ Acquire a reference to the object associated with this Shared handle.
Parameters: - constructor_fn – function that initialises / constructs the object if not present in the cache. This function should take no arguments. It should return an initialised object, or None if the object could not be initialised / constructed.
- tag – an optional indentifier to store with the cached object. If subsequent calls to acquire use different tags, the object will be reloaded rather than returned from cache.
Returns: A reference to an initialised object, either from the cache, or newly-constructed.
-