Source code for apache_beam.utils.interactive_utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Common interactive utility module.

For experimental usage only; no backwards-compatibility guarantees.
"""
# pytype: skip-file

from __future__ import absolute_import

import logging

_LOGGER = logging.getLogger(__name__)


[docs]def is_in_ipython(): """Determines if current code is executed within an ipython session.""" try: from IPython import get_ipython # pylint: disable=import-error if get_ipython(): return True return False except ImportError: # If dependencies are not available, then not interactive for sure. return False except (KeyboardInterrupt, SystemExit): raise except: # pylint: disable=bare-except _LOGGER.info( 'Unexpected error occurred, treated as not in IPython.', exc_info=True) return False
[docs]def is_in_notebook(): """Determines if current code is executed from an ipython notebook. If is_in_notebook() is True, then is_in_ipython() must also be True. """ is_in_notebook = False if is_in_ipython(): # The import and usage must be valid under the execution path. from IPython import get_ipython if 'IPKernelApp' in get_ipython().config: is_in_notebook = True return is_in_notebook
[docs]def alter_label_if_ipython(transform, pvalueish): """Alters the label to an interactive label with ipython prompt metadata prefixed for the given transform if the given pvalueish belongs to a user-defined pipeline and current code execution is within an ipython kernel. Otherwise, noop. A label is either a user-defined or auto-generated str name of a PTransform that is unique within a pipeline. If current environment is_in_ipython(), Beam can implicitly create interactive labels to replace labels of top-level PTransforms to be applied. The label is formatted as: `Cell {prompt}: {original_label}`. """ if is_in_ipython(): from apache_beam.runners.interactive import interactive_environment as ie # Tracks user defined pipeline instances in watched scopes so that we only # alter labels for any transform to pvalueish belonging to those pipeline # instances, excluding any transform to be applied in other pipeline # instances the Beam SDK creates implicitly. ie.current_env().track_user_pipelines() from IPython import get_ipython prompt = get_ipython().execution_count pipeline = _extract_pipeline_of_pvalueish(pvalueish) if (pipeline # We only alter for transforms to be applied to user-defined pipelines # at pipeline construction time. and pipeline in ie.current_env().tracked_user_pipelines): transform.label = '[{}]: {}'.format(prompt, transform.label)
def _extract_pipeline_of_pvalueish(pvalueish): """Extracts the pipeline that the given pvalueish belongs to.""" if isinstance(pvalueish, tuple) and len(pvalueish) > 0: pvalue = pvalueish[0] elif isinstance(pvalueish, dict) and len(pvalueish) > 0: pvalue = next(iter(pvalueish.values())) else: pvalue = pvalueish if hasattr(pvalue, 'pipeline'): return pvalue.pipeline return None