#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Common interactive utility module.
For experimental usage only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
import logging
_LOGGER = logging.getLogger(__name__)
[docs]def is_in_ipython():
"""Determines if current code is executed within an ipython session."""
try:
from IPython import get_ipython # pylint: disable=import-error
if get_ipython():
return True
return False
except ImportError:
# If dependencies are not available, then not interactive for sure.
return False
except (KeyboardInterrupt, SystemExit):
raise
except: # pylint: disable=bare-except
_LOGGER.info(
'Unexpected error occurred, treated as not in IPython.', exc_info=True)
return False
[docs]def is_in_notebook():
"""Determines if current code is executed from an ipython notebook.
If is_in_notebook() is True, then is_in_ipython() must also be True.
"""
is_in_notebook = False
if is_in_ipython():
# The import and usage must be valid under the execution path.
from IPython import get_ipython
if 'IPKernelApp' in get_ipython().config:
is_in_notebook = True
return is_in_notebook
[docs]def alter_label_if_ipython(transform, pvalueish):
"""Alters the label to an interactive label with ipython prompt metadata
prefixed for the given transform if the given pvalueish belongs to a
user-defined pipeline and current code execution is within an ipython kernel.
Otherwise, noop.
A label is either a user-defined or auto-generated str name of a PTransform
that is unique within a pipeline. If current environment is_in_ipython(), Beam
can implicitly create interactive labels to replace labels of top-level
PTransforms to be applied. The label is formatted as:
`Cell {prompt}: {original_label}`.
"""
if is_in_ipython():
from apache_beam.runners.interactive import interactive_environment as ie
# Tracks user defined pipeline instances in watched scopes so that we only
# alter labels for any transform to pvalueish belonging to those pipeline
# instances, excluding any transform to be applied in other pipeline
# instances the Beam SDK creates implicitly.
ie.current_env().track_user_pipelines()
from IPython import get_ipython
prompt = get_ipython().execution_count
pipeline = _extract_pipeline_of_pvalueish(pvalueish)
if (pipeline
# We only alter for transforms to be applied to user-defined pipelines
# at pipeline construction time.
and pipeline in ie.current_env().tracked_user_pipelines):
transform.label = '[{}]: {}'.format(prompt, transform.label)
def _extract_pipeline_of_pvalueish(pvalueish):
"""Extracts the pipeline that the given pvalueish belongs to."""
if isinstance(pvalueish, tuple) and len(pvalueish) > 0:
pvalue = pvalueish[0]
elif isinstance(pvalueish, dict) and len(pvalueish) > 0:
pvalue = next(iter(pvalueish.values()))
else:
pvalue = pvalueish
if hasattr(pvalue, 'pipeline'):
return pvalue.pipeline
return None