Source code for apache_beam.runners.interactive.pipeline_instrument

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Module to instrument interactivity to the given pipeline.

For internal use only; no backwards-compatibility guarantees.
This module accesses current interactive environment and analyzes given pipeline
to transform original pipeline into a one-shot pipeline with interactivity.
"""
from __future__ import absolute_import

import apache_beam as beam
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.runners.interactive import interactive_environment as ie

READ_CACHE = "_ReadCache_"
WRITE_CACHE = "_WriteCache_"


[docs]class PipelineInstrument(object): """A pipeline instrument for pipeline to be executed by interactive runner. This module should never depend on underlying runner that interactive runner delegates. It instruments the original instance of pipeline directly by appending or replacing transforms with help of cache. It provides interfaces to recover states of original pipeline. It's the interactive runner's responsibility to coordinate supported underlying runners to run the pipeline instrumented and recover the original pipeline states if needed. """ def __init__(self, pipeline, options=None): self._pipeline = pipeline # The cache manager should be initiated outside of this module and outside # of run_pipeline() from interactive runner so that its lifespan could cover # multiple runs in the interactive environment. Owned by # interactive_environment module. Not owned by this module. # TODO(BEAM-7760): change the scope of cache to be owned by runner or # pipeline result instances because a pipeline is not 1:1 correlated to a # running job. Only complete and read-only cache is valid across multiple # jobs. Other cache instances should have their own scopes. Some design # change should support only runner.run(pipeline) pattern rather than # pipeline.run([runner]) and a runner can only run at most one pipeline at a # time. Otherwise, result returned by run() is the only 1:1 anchor. self._cache_manager = ie.current_env().cache_manager() # Invoke a round trip through the runner API. This makes sure the Pipeline # proto is stable. The snapshot of pipeline will not be mutated within this # module and can be used to recover original pipeline if needed. self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api( pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options) # Snapshot of original pipeline information. (self._original_pipeline_proto, self._original_context) = self._pipeline_snap.to_runner_api( return_context=True, use_fake_coders=True) # All compute-once-against-original-pipeline fields. self._has_unbounded_source = has_unbounded_source(self._pipeline_snap) # TODO(BEAM-7760): once cache scope changed, this is not needed to manage # relationships across pipelines, runners, and jobs. self._pcolls_to_pcoll_id = pcolls_to_pcoll_id(self._pipeline_snap, self._original_context) # A mapping from PCollection id to python id() value in user defined # pipeline instance. (self._pcoll_version_map, self._cacheables) = cacheables(self.pcolls_to_pcoll_id) # A dict from cache key to PCollection that is read from cache. # If exists, caller should reuse the PCollection read. If not, caller # should create new transform and track the PCollection read from cache. # (Dict[str, AppliedPTransform]). self._cached_pcoll_read = {}
[docs] def instrumented_pipeline_proto(self): """Always returns a new instance of portable instrumented proto.""" return self._pipeline.to_runner_api(use_fake_coders=True)
@property def has_unbounded_source(self): """Checks if a given pipeline has any source that is unbounded. The function directly checks the source transform definition instead of pvalues in the pipeline. Thus manually setting is_bounded field of a PCollection or switching streaming mode will not affect this function's result. The result is always deterministic when the source code of a pipeline is defined. """ return self._has_unbounded_source @property def cacheables(self): """Finds cacheable PCollections from the pipeline. The function only treats the result as cacheables since there is no guarantee whether the cache desired PCollection has been cached or not. A PCollection desires caching when it's bound to a user defined variable in source code. Otherwise, the PCollection is not reusale nor introspectable which nullifying the need of cache. """ return self._cacheables @property def pcolls_to_pcoll_id(self): """Returns a dict mapping str(PCollection)s to IDs.""" return self._pcolls_to_pcoll_id @property def original_pipeline_proto(self): """Returns the portable proto representation of the pipeline before instrumentation.""" return self._original_pipeline_proto @property def original_pipeline(self): """Returns a snapshot of the pipeline before instrumentation.""" return self._pipeline_snap
[docs] def instrument(self): """Instruments original pipeline with cache. For cacheable output PCollection, if cache for the key doesn't exist, do _write_cache(); for cacheable input PCollection, if cache for the key exists, do _read_cache(). No instrument in any other situation. Modifies: self._pipeline """ self._preprocess() cacheable_inputs = set() class InstrumentVisitor(PipelineVisitor): """Visitor utilizes cache to instrument the pipeline.""" def __init__(self, pin): self._pin = pin def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): cacheable_inputs.update(self._pin._cacheable_inputs(transform_node)) v = InstrumentVisitor(self) self._pipeline.visit(v) # Create ReadCache transforms. for cacheable_input in cacheable_inputs: self._read_cache(cacheable_input) # Replace/wire inputs w/ cached PCollections from ReadCache transforms. self._replace_with_cached_inputs() # Write cache for all cacheables. for _, cacheable in self.cacheables.items(): self._write_cache(cacheable['pcoll'])
# TODO(BEAM-7760): prune sub graphs that doesn't need to be executed. def _preprocess(self): """Pre-processes the pipeline. Since the pipeline instance in the class might not be the same instance defined in the user code, the pre-process will figure out the relationship of cacheable PCollections between these 2 instances by replacing 'pcoll' fields in the cacheable dictionary with ones from the running instance. """ class PreprocessVisitor(PipelineVisitor): def __init__(self, pin): self._pin = pin def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): for in_pcoll in transform_node.inputs: self._process(in_pcoll) for out_pcoll in transform_node.outputs.values(): self._process(out_pcoll) def _process(self, pcoll): pcoll_id = self._pin.pcolls_to_pcoll_id.get(str(pcoll), '') if pcoll_id in self._pin._pcoll_version_map: cacheable_key = self._pin._cacheable_key(pcoll) if (cacheable_key in self._pin.cacheables and self._pin.cacheables[cacheable_key]['pcoll'] != pcoll): self._pin.cacheables[cacheable_key]['pcoll'] = pcoll v = PreprocessVisitor(self) self._pipeline.visit(v) def _write_cache(self, pcoll): """Caches a cacheable PCollection. For the given PCollection, by appending sub transform part that materialize the PCollection through sink into cache implementation. The cache write is not immediate. It happens when the runner runs the transformed pipeline and thus not usable for this run as intended. This function always writes the cache for the given PCollection as long as the PCollection belongs to the pipeline being instrumented and the keyed cache is absent. Modifies: self._pipeline """ # Makes sure the pcoll belongs to the pipeline being instrumented. if pcoll.pipeline is not self._pipeline: return # The keyed cache is always valid within this instrumentation. key = self.cache_key(pcoll) # Only need to write when the cache with expected key doesn't exist. if not self._cache_manager.exists('full', key): _ = pcoll | '{}{}'.format(WRITE_CACHE, key) >> cache.WriteCache( self._cache_manager, key) def _read_cache(self, pcoll): """Reads a cached pvalue. A noop will cause the pipeline to execute the transform as it is and cache nothing from this transform for next run. Modifies: self._pipeline """ # Makes sure the pcoll belongs to the pipeline being instrumented. if pcoll.pipeline is not self._pipeline: return # The keyed cache is always valid within this instrumentation. key = self.cache_key(pcoll) # Can only read from cache when the cache with expected key exists. if self._cache_manager.exists('full', key): if key not in self._cached_pcoll_read: # Mutates the pipeline with cache read transform attached # to root of the pipeline. pcoll_from_cache = ( self._pipeline | '{}{}'.format(READ_CACHE, key) >> cache.ReadCache( self._cache_manager, key)) self._cached_pcoll_read[key] = pcoll_from_cache # else: NOOP when cache doesn't exist, just compute the original graph. def _replace_with_cached_inputs(self): """Replace PCollection inputs in the pipeline with cache if possible. For any input PCollection, find out whether there is valid cache. If so, replace the input of the AppliedPTransform with output of the AppliedPtransform that sources pvalue from the cache. If there is no valid cache, noop. """ class ReadCacheWireVisitor(PipelineVisitor): """Visitor wires cache read as inputs to replace corresponding original input PCollections in pipeline. """ def __init__(self, pin): """Initializes with a PipelineInstrument.""" self._pin = pin def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): if transform_node.inputs: input_list = list(transform_node.inputs) for i in range(len(input_list)): key = self._pin.cache_key(input_list[i]) if key in self._pin._cached_pcoll_read: input_list[i] = self._pin._cached_pcoll_read[key] transform_node.inputs = tuple(input_list) v = ReadCacheWireVisitor(self) self._pipeline.visit(v) def _cacheable_inputs(self, transform): inputs = set() for in_pcoll in transform.inputs: if self._cacheable_key(in_pcoll) in self.cacheables: inputs.add(in_pcoll) return inputs def _cacheable_key(self, pcoll): """Gets the key a cacheable PCollection is tracked within the instrument.""" return cacheable_key(pcoll, self.pcolls_to_pcoll_id, self._pcoll_version_map)
[docs] def cache_key(self, pcoll): """Gets the identifier of a cacheable PCollection in cache. If the pcoll is not a cacheable, return ''. The key is what the pcoll would use as identifier if it's materialized in cache. It doesn't mean that there would definitely be such cache already. Also, the pcoll can come from the original user defined pipeline object or an equivalent pcoll from a transformed copy of the original pipeline. """ cacheable = self.cacheables.get(self._cacheable_key(pcoll), None) if cacheable: return '_'.join((cacheable['var'], cacheable['version'], cacheable['pcoll_id'], cacheable['producer_version'])) return ''
[docs]def pin(pipeline, options=None): """Creates PipelineInstrument for a pipeline and its options with cache.""" pi = PipelineInstrument(pipeline, options) pi.instrument() # Instruments the pipeline only once. return pi
[docs]def cacheables(pcolls_to_pcoll_id): """Finds cache desired PCollections from the instrumented pipeline. The function only treats the result as cacheables since whether the cache desired PCollection has been cached depends on whether the pipeline has been executed in current interactive environment. A PCollection desires caching when it's bound to a user defined variable in source code. Otherwise, the PCollection is not reusable nor introspectable which nullifies the need of cache. There might be multiple pipelines defined and watched, this will return for PCollections from the ones with pcolls_to_pcoll_id analyzed. The check is not strict because pcoll_id is not unique across multiple pipelines. Additional check needs to be done during instrument. """ pcoll_version_map = {} cacheables = {} for watching in ie.current_env().watching(): for key, val in watching: # TODO(BEAM-8288): cleanup the attribute check when py2 is not supported. if hasattr(val, '__class__') and isinstance(val, beam.pvalue.PCollection): cacheable = {} cacheable['pcoll_id'] = pcolls_to_pcoll_id.get(str(val), None) # It's highly possible that PCollection str is not unique across # multiple pipelines, further check during instrument is needed. if not cacheable['pcoll_id']: continue cacheable['var'] = key cacheable['version'] = str(id(val)) cacheable['pcoll'] = val cacheable['producer_version'] = str(id(val.producer)) cacheables[cacheable_key(val, pcolls_to_pcoll_id)] = cacheable pcoll_version_map[cacheable['pcoll_id']] = cacheable['version'] return pcoll_version_map, cacheables
[docs]def cacheable_key(pcoll, pcolls_to_pcoll_id, pcoll_version_map=None): pcoll_version = str(id(pcoll)) pcoll_id = pcolls_to_pcoll_id.get(str(pcoll), '') if pcoll_version_map: original_pipeline_pcoll_version = pcoll_version_map.get(pcoll_id, None) if original_pipeline_pcoll_version: pcoll_version = original_pipeline_pcoll_version return '_'.join((pcoll_version, pcoll_id))
[docs]def has_unbounded_source(pipeline): """Checks if a given pipeline has any source that is unbounded.""" class CheckUnboundednessVisitor(PipelineVisitor): """Vsitor checks if there is any unbouned read source in the Pipeline. Visitor visits all nodes and check is_bounded() for all sources of read PTransform. As long as there is at least 1 source introduces unbounded data, returns True. We don't check the is_bounded field from proto based PCollection since they may not be correctly set with to_runner_api. """ def __init__(self): self.has_unbounded_source = False def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): if (not self.has_unbounded_source and isinstance(transform_node, beam.pipeline.AppliedPTransform) and isinstance(transform_node.transform, beam.io.iobase.Read) and not transform_node.transform.source.is_bounded()): self.has_unbounded_source = True v = CheckUnboundednessVisitor() pipeline.visit(v) return v.has_unbounded_source
[docs]def pcolls_to_pcoll_id(pipeline, original_context): """Returns a dict mapping PCollections string to PCollection IDs. Using a PipelineVisitor to iterate over every node in the pipeline, records the mapping from PCollections to PCollections IDs. This mapping will be used to query cached PCollections. Returns: (dict from str to str) a dict mapping str(pcoll) to pcoll_id. """ class PCollVisitor(PipelineVisitor): """"A visitor that records input and output values to be replaced. Input and output values that should be updated are recorded in maps input_replacements and output_replacements respectively. We cannot update input and output values while visiting since that results in validation errors. """ def __init__(self): self.pcolls_to_pcoll_id = {} def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): for pcoll in transform_node.outputs.values(): self.pcolls_to_pcoll_id[str(pcoll)] = ( original_context.pcollections.get_id(pcoll)) v = PCollVisitor() pipeline.visit(v) return v.pcolls_to_pcoll_id