apache_beam.transforms.enrichment module

apache_beam.transforms.enrichment.cross_join(left: Dict[str, Any], right: Dict[str, Any]) → apache_beam.pvalue.Row[source]

performs a cross join on two dict objects.

Joins the columns of the right row onto the left row.

Parameters:
  • left (Dict[str, Any]) – input request dictionary.
  • right (Dict[str, Any]) – response dictionary from the API.
Returns:

beam.Row containing the merged columns.

class apache_beam.transforms.enrichment.EnrichmentSourceHandler[source]

Bases: apache_beam.io.requestresponse.Caller

Wrapper class for apache_beam.io.requestresponse.Caller.

Ensure that the implementation of __call__ method returns a tuple of beam.Row objects.

get_cache_key(request: InputT) → str[source]

Returns the request to be cached. This is how the response will be looked up in the cache as well.

Implement this method to provide the key for the cache. By default, the entire request is stored as the cache key.

For example, in BigTableEnrichmentHandler, the row key for the element is returned here.

class apache_beam.transforms.enrichment.Enrichment(source_handler: apache_beam.transforms.enrichment.EnrichmentSourceHandler, join_fn: Callable[[Dict[str, Any], Dict[str, Any]], apache_beam.pvalue.Row] = <function cross_join>, timeout: Optional[float] = 30, repeater: apache_beam.io.requestresponse.Repeater = <apache_beam.io.requestresponse.ExponentialBackOffRepeater object>, throttler: apache_beam.io.requestresponse.PreCallThrottler = <apache_beam.io.requestresponse.DefaultThrottler object>)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A apache_beam.transforms.enrichment.Enrichment transform to enrich elements in a PCollection.

Uses the apache_beam.transforms.enrichment.EnrichmentSourceHandler to enrich elements by joining the metadata from external source.

Processes an input PCollection of beam.Row by applying a apache_beam.transforms.enrichment.EnrichmentSourceHandler to each element and returning the enriched PCollection.

Parameters:
expand(input_row: apache_beam.pvalue.PCollection[~InputT][InputT]) → apache_beam.pvalue.PCollection[~OutputT][OutputT][source]
with_redis_cache(host: str, port: int, time_to_live: Union[int, datetime.timedelta] = 86400, *, request_coder: Optional[apache_beam.coders.coders.Coder] = None, response_coder: Optional[apache_beam.coders.coders.Coder] = None, **kwargs)[source]

Configure the Redis cache to use with enrichment transform.

Parameters:
  • host (str) – The hostname or IP address of the Redis server.
  • port (int) – The port number of the Redis server.
  • time_to_live(Union[int, timedelta]) The time-to-live (TTL) for records stored in Redis. Provide an integer (in seconds) or a datetime.timedelta object.
  • request_coder – (Optional[coders.Coder]) coder for requests stored in Redis.
  • response_coder – (Optional[coders.Coder]) coder for decoding responses received from Redis.
  • kwargs – Optional additional keyword arguments that are required to connect to your redis server. Same as redis.Redis().