apache_beam.runners.interactive.pipeline_instrument module

Module to instrument interactivity to the given pipeline.

For internal use only; no backwards-compatibility guarantees. This module accesses current interactive environment and analyzes given pipeline to transform original pipeline into a one-shot pipeline with interactivity.

class apache_beam.runners.interactive.pipeline_instrument.PipelineInstrument(pipeline, options=None)[source]

Bases: object

A pipeline instrument for pipeline to be executed by interactive runner.

This module should never depend on underlying runner that interactive runner delegates. It instruments the original instance of pipeline directly by appending or replacing transforms with help of cache. It provides interfaces to recover states of original pipeline. It’s the interactive runner’s responsibility to coordinate supported underlying runners to run the pipeline instrumented and recover the original pipeline states if needed.

instrumented_pipeline_proto()[source]

Always returns a new instance of portable instrumented proto.

prune_subgraph_for(pipeline, required_transform_ids)[source]
background_caching_pipeline_proto()[source]

Returns the background caching pipeline.

This method creates a background caching pipeline by: adding writes to cache from each unbounded source (done in the instrument method), and cutting out all components (transform, PCollections, coders, windowing strategies) that are not the unbounded sources or writes to cache (or subtransforms thereof).

cacheables

Returns the Cacheables by PCollection ids.

If you’re already working with user defined pipelines and PCollections, do not build a PipelineInstrument just to get the cacheables. Instead, use apache_beam.runners.interactive.utils.cacheables.

has_unbounded_sources

Returns whether the pipeline has any recordable sources.

original_pipeline_proto

Returns a snapshot of the pipeline proto before instrumentation.

user_pipeline

Returns a reference to the pipeline instance defined by the user. If a pipeline has no cacheable PCollection and the user pipeline cannot be found, return None indicating there is nothing to be cached in the user pipeline.

The pipeline given for instrumenting and mutated in this class is not necessarily the pipeline instance defined by the user. From the watched scopes, this class figures out what the user pipeline instance is. This metadata can be used for tracking pipeline results.

runner_pcoll_to_user_pcoll

Returns cacheable PCollections correlated from instances in the runner pipeline to instances in the user pipeline.

find_cacheables() → Dict[str, apache_beam.runners.interactive.caching.cacheable.Cacheable][source]

Finds PCollections that need to be cached for analyzed pipeline.

There might be multiple pipelines defined and watched, this will only find cacheables belong to the analyzed pipeline.

instrument()[source]

Instruments original pipeline with cache.

For cacheable output PCollection, if cache for the key doesn’t exist, do _write_cache(); for cacheable input PCollection, if cache for the key exists, do _read_cache(). No instrument in any other situation.

Modifies:
self._pipeline
preprocess()[source]

Pre-processes the pipeline.

Since the pipeline instance in the class might not be the same instance defined in the user code, the pre-process will figure out the relationship of cacheable PCollections between these 2 instances by replacing ‘pcoll’ fields in the cacheable dictionary with ones from the running instance.

pcoll_id(pcoll)[source]

Gets the PCollection id of the given pcoll.

Returns ‘’ if not found.

cache_key(pcoll)[source]

Gets the identifier of a cacheable PCollection in cache.

If the pcoll is not a cacheable, return ‘’. This is only needed in pipeline instrument when the origin of given pcoll is unknown (whether it’s from the user pipeline or a runner pipeline). If a pcoll is from the user pipeline, always use CacheKey.from_pcoll to build the key. The key is what the pcoll would use as identifier if it’s materialized in cache. It doesn’t mean that there would definitely be such cache already. Also, the pcoll can come from the original user defined pipeline object or an equivalent pcoll from a transformed copy of the original pipeline.

apache_beam.runners.interactive.pipeline_instrument.build_pipeline_instrument(pipeline, options=None)[source]

Creates PipelineInstrument for a pipeline and its options with cache.

Throughout the process, the returned PipelineInstrument snapshots the given pipeline and then mutates the pipeline. It’s invoked by interactive components such as the InteractiveRunner and the given pipeline should be implicitly created runner pipelines instead of pipeline instances defined by the user.

This is the shorthand for doing 3 steps: 1) compute once for metadata of the given runner pipeline and everything watched from user pipelines; 2) associate info between the runner pipeline and its corresponding user pipeline, eliminate data from other user pipelines if there are any; 3) mutate the runner pipeline to apply interactivity.

apache_beam.runners.interactive.pipeline_instrument.pcoll_to_pcoll_id(pipeline, original_context)[source]

Returns a dict mapping PCollections string to PCollection IDs.

Using a PipelineVisitor to iterate over every node in the pipeline, records the mapping from PCollections to PCollections IDs. This mapping will be used to query cached PCollections.

Returns:(dict from str to str) a dict mapping str(pcoll) to pcoll_id.