Cache data using a shared object
A cache is a software component that stores data so that future requests for that data can be served faster. To access a cache, you can use side inputs, stateful DoFn
, and calls to an external service. The Python SDK provides another option in the shared module. This option can be more memory-efficient than side inputs, simpler than a stateful DoFn
, and more performant than calling an external service, because it does not have to access an external service for every element or bundle of elements. For more details about strategies for caching data using Beam SDK, see the session Strategies for caching data in Dataflow using Beam SDK from the 2022 Beam Summit.
The examples on this page demonstrate how to use the Shared
class of the shared module
to enrich elements in both bounded and unbounded PCollection
objects. Two data sets are used in the samples: order and customer. The order records include customer IDs that customer attributes are added to by mapping the customer records.
Create a cache on a batch pipeline
In this example, the customer cache is loaded as a dictionary in the setup
method of the EnrichOrderFn
. The cache is used to add customer attributes to the order records. Because the Python dictionary doesn’t support weak references and a Shared
object encapsulates a weak reference to a singleton instance of the shared resource, create a wrapper class.
# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
pass
class EnrichOrderFn(beam.DoFn):
def __init__(self):
self._customers = {}
self._shared_handle = shared.Shared()
def setup(self):
# setup is a good place to initialize transient in-memory resources.
self._customer_lookup = self._shared_handle.acquire(self.load_customers)
def load_customers(self):
self._customers = expensive_remote_call_to_load_customers()
return WeakRefDict(self._customers)
def process(self, element):
attr = self._customer_lookup.get(element["customer_id"], {})
yield {**element, **attr}
Create a cache and update it regularly on a streaming pipeline
Because the customer cache is assumed to change over time, you need to refresh it periodically. To reload the shared object, change the tag
argument of the acquire
method. In this example, the refresh is implemented in the start_bundle
method, where it compares the current tag value to the value that is associated with the existing shared object. The set_tag
method returns a tag value that is the same within the maximum seconds of staleness. Therefore, if a tag value is greater than the existing tag value, it triggers a refresh of the customer cache.
# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
pass
class EnrichOrderFn(beam.DoFn):
def __init__(self):
self._max_stale_sec = 60
self._customers = {}
self._shared_handle = shared.Shared()
def setup(self):
# setup is a good place to initialize transient in-memory resources.
self._customer_lookup = self._shared_handle.acquire(
self.load_customers, self.set_tag()
)
def set_tag(self):
# A single tag value is returned within a period, which is upper-limited by the max stale second.
current_ts = datetime.now().timestamp()
return current_ts - (current_ts % self._max_stale_sec)
def load_customers(self):
# Assign the tag value of the current period for comparison.
self._customers = expensive_remote_call_to_load_customers(tag=self.set_tag())
return WeakRefDict(self._customers)
def start_bundle(self):
# Update the shared object when the current tag value exceeds the existing value.
if self.set_tag() > self._customers["tag"]:
self._customer_lookup = self._shared_handle.acquire(
self.load_customers, self.set_tag()
)
def process(self, element):
attr = self._customer_lookup.get(element["customer_id"], {})
yield {**element, **attr}
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!