#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module to build pipeline fragment that produces given PCollections.
For internal use only; no backwards-compatibility guarantees.
"""
import apache_beam as beam
from apache_beam.pipeline import AppliedPTransform
from apache_beam.pipeline import PipelineVisitor
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as instr
from apache_beam.testing.test_stream import TestStream
[docs]
class PipelineFragment(object):
"""A fragment of a pipeline definition.
A pipeline fragment is built from the original pipeline definition to include
only PTransforms that are necessary to produce the given PCollections.
"""
def __init__(self, pcolls, options=None, runner=None):
"""Constructor of PipelineFragment.
Args:
pcolls: (List[PCollection]) a list of PCollections to build pipeline
fragment for.
options: (PipelineOptions) the pipeline options for the implicit
pipeline run.
runner: (Runner) the pipeline runner for the implicit
pipeline run.
"""
assert len(pcolls) > 0, (
'Need at least 1 PCollection as the target data to build a pipeline '
'fragment that produces it.')
for pcoll in pcolls:
assert isinstance(pcoll, beam.pvalue.PCollection), (
'{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
# No modification to self._user_pipeline is allowed.
self._user_pipeline = pcolls[0].pipeline
# These are user PCollections. Do not use them to deduce anything that
# will be executed by any runner. Instead, use
# `self._runner_pcolls_to_user_pcolls.keys()` to get copied PCollections.
self._pcolls = set(pcolls)
for pcoll in self._pcolls:
assert pcoll.pipeline is self._user_pipeline, (
'{} belongs to a different user pipeline than other PCollections '
'given and cannot be used to build a pipeline fragment that produces '
'the given PCollections.'.format(pcoll))
self._options = options
self._runner = runner
# A copied pipeline instance for modification without changing the user
# pipeline instance held by the end user. This instance can be processed
# into a pipeline fragment that later run by the underlying runner.
self._runner_pipeline = self._build_runner_pipeline()
_, self._context = self._runner_pipeline.to_runner_api(return_context=True)
self._runner_pcoll_to_id = instr.pcoll_to_pcoll_id(
self._runner_pipeline, self._context)
# Correlate components in the runner pipeline to components in the user
# pipeline. The target pcolls are the pcolls given and defined in the user
# pipeline.
self._id_to_target_pcoll = self._calculate_target_pcoll_ids()
self._label_to_user_transform = self._calculate_user_transform_labels()
# Below will give us the 1:1 correlation between
# PCollections/AppliedPTransforms from the copied runner pipeline and
# PCollections/AppliedPTransforms from the user pipeline.
# (Dict[PCollection, PCollection])
(
self._runner_pcolls_to_user_pcolls,
# (Dict[AppliedPTransform, AppliedPTransform])
self._runner_transforms_to_user_transforms
) = self._build_correlation_between_pipelines(
self._runner_pcoll_to_id,
self._id_to_target_pcoll,
self._label_to_user_transform)
# Below are operated on the runner pipeline.
(self._necessary_transforms,
self._necessary_pcollections) = self._mark_necessary_transforms_and_pcolls(
self._runner_pcolls_to_user_pcolls)
self._runner_pipeline = self._prune_runner_pipeline_to_fragment(
self._runner_pipeline, self._necessary_transforms)
[docs]
def deduce_fragment(self):
"""Deduce the pipeline fragment as an apache_beam.Pipeline instance."""
fragment = beam.pipeline.Pipeline.from_runner_api(
self._runner_pipeline.to_runner_api(),
self._runner or self._runner_pipeline.runner,
self._options)
ie.current_env().add_derived_pipeline(self._runner_pipeline, fragment)
return fragment
[docs]
def run(self, display_pipeline_graph=False, use_cache=True, blocking=False):
"""Shorthand to run the pipeline fragment."""
fragment = self.deduce_fragment()
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
try:
if isinstance(self._runner_pipeline.runner, InteractiveRunner):
preserved_skip_display = self._runner_pipeline.runner._skip_display
preserved_force_compute = self._runner_pipeline.runner._force_compute
preserved_blocking = self._runner_pipeline.runner._blocking
self._runner_pipeline.runner._skip_display = not display_pipeline_graph
self._runner_pipeline.runner._force_compute = not use_cache
self._runner_pipeline.runner._blocking = blocking
return fragment.run()
else:
pipeline_instrument = instr.build_pipeline_instrument(
fragment, self._runner_pipeline._options)
pipeline_instrument_proto = (
pipeline_instrument.instrumented_pipeline_proto())
if any(pcoll.is_bounded == beam_runner_api_pb2.IsBounded.UNBOUNDED
for pcoll in
pipeline_instrument_proto.components.pcollections.values()):
raise RuntimeError(
'Please specify InteractiveRunner when creating '
'the Beam pipeline to use this function '
'on unbouded PCollections.')
result = beam.pipeline.Pipeline.from_runner_api(
pipeline_instrument_proto, fragment.runner,
fragment._options).run()
result.wait_until_finish()
ie.current_env().mark_pcollection_computed(
pipeline_instrument.cached_pcolls)
return result
finally:
if isinstance(self._runner_pipeline.runner, InteractiveRunner):
self._runner_pipeline.runner._skip_display = preserved_skip_display
self._runner_pipeline.runner._force_compute = preserved_force_compute
self._runner_pipeline.runner._blocking = preserved_blocking
def _build_runner_pipeline(self):
runner_pipeline = beam.pipeline.Pipeline.from_runner_api(
self._user_pipeline.to_runner_api(),
self._user_pipeline.runner,
self._options)
ie.current_env().add_derived_pipeline(self._user_pipeline, runner_pipeline)
return runner_pipeline
def _calculate_target_pcoll_ids(self):
pcoll_id_to_target_pcoll = {}
for pcoll in self._pcolls:
pcoll_id_to_target_pcoll[self._runner_pcoll_to_id.get(str(pcoll),
'')] = pcoll
return pcoll_id_to_target_pcoll
def _calculate_user_transform_labels(self):
label_to_user_transform = {}
class UserTransformVisitor(PipelineVisitor):
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
if transform_node is not None:
label_to_user_transform[transform_node.full_label] = transform_node
v = UserTransformVisitor()
self._runner_pipeline.visit(v)
return label_to_user_transform
def _build_correlation_between_pipelines(
self, runner_pcoll_to_id, id_to_target_pcoll, label_to_user_transform):
runner_pcolls_to_user_pcolls = {}
runner_transforms_to_user_transforms = {}
class CorrelationVisitor(PipelineVisitor):
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
self._process_transform(transform_node)
for in_pcoll in transform_node.inputs:
self._process_pcoll(in_pcoll)
for out_pcoll in transform_node.outputs.values():
self._process_pcoll(out_pcoll)
def _process_pcoll(self, pcoll):
pcoll_id = runner_pcoll_to_id.get(str(pcoll), '')
if pcoll_id in id_to_target_pcoll:
runner_pcolls_to_user_pcolls[pcoll] = (id_to_target_pcoll[pcoll_id])
def _process_transform(self, transform_node):
if transform_node.full_label in label_to_user_transform:
runner_transforms_to_user_transforms[transform_node] = (
label_to_user_transform[transform_node.full_label])
v = CorrelationVisitor()
self._runner_pipeline.visit(v)
return runner_pcolls_to_user_pcolls, runner_transforms_to_user_transforms
def _mark_necessary_transforms_and_pcolls(self, runner_pcolls_to_user_pcolls):
necessary_transforms = set()
all_inputs = set()
updated_all_inputs = set(runner_pcolls_to_user_pcolls.keys())
# Do this until no more new PCollection is recorded.
while len(updated_all_inputs) != len(all_inputs):
all_inputs = set(updated_all_inputs)
for pcoll in all_inputs:
producer = pcoll.producer
while producer:
if producer in necessary_transforms:
break
# Mark the AppliedPTransform as necessary.
necessary_transforms.add(producer)
# Also mark composites that are not the root transform. If the root
# transform is added, then all transforms are incorrectly marked as
# necessary. If composites are not handled, then there will be
# orphaned PCollections.
if producer.parent is not None:
necessary_transforms.update(producer.parts)
# This will recursively add all the PCollections in this composite.
for part in producer.parts:
updated_all_inputs.update(part.outputs.values())
# Record all necessary input and side input PCollections.
updated_all_inputs.update(producer.inputs)
# pylint: disable=bad-option-value
side_input_pvalues = set(
map(lambda side_input: side_input.pvalue, producer.side_inputs))
updated_all_inputs.update(side_input_pvalues)
# Go to its parent AppliedPTransform.
producer = producer.parent
return necessary_transforms, all_inputs
def _prune_runner_pipeline_to_fragment(
self, runner_pipeline, necessary_transforms):
class PruneVisitor(PipelineVisitor):
def enter_composite_transform(self, transform_node):
if should_skip_pruning(transform_node):
return
pruned_parts = list(transform_node.parts)
for part in transform_node.parts:
if part not in necessary_transforms:
pruned_parts.remove(part)
transform_node.parts = tuple(pruned_parts)
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
if transform_node not in necessary_transforms:
transform_node.parent = None
v = PruneVisitor()
runner_pipeline.visit(v)
return runner_pipeline
[docs]
def should_skip_pruning(transform: AppliedPTransform):
return (
isinstance(transform.transform, TestStream) or
'_DataFrame_' in transform.full_label)