#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities to be used in Interactive Beam.
"""
import functools
import hashlib
import importlib
import json
import logging
from typing import Any
from typing import Dict
from typing import Tuple
import pandas as pd
import apache_beam as beam
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.frame_base import DeferredBase
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.interactive.caching.cacheable import Cacheable
from apache_beam.runners.interactive.caching.cacheable import CacheKey
from apache_beam.runners.interactive.caching.expression_cache import ExpressionCache
from apache_beam.testing.test_stream import WindowedValueHolder
from apache_beam.typehints.schemas import named_fields_from_element_type
_LOGGER = logging.getLogger(__name__)
# Add line breaks to the IPythonLogHandler's HTML output.
_INTERACTIVE_LOG_STYLE = """
<style>
div.alert {
white-space: pre-line;
}
</style>
"""
[docs]def to_element_list(
reader, # type: Generator[Union[beam_runner_api_pb2.TestStreamPayload.Event, WindowedValueHolder]] # noqa: F821
coder, # type: Coder # noqa: F821
include_window_info, # type: bool
n=None, # type: int
include_time_events=False, # type: bool
):
# type: (...) -> List[WindowedValue] # noqa: F821
"""Returns an iterator that properly decodes the elements from the reader.
"""
# Defining a generator like this makes it easier to limit the count of
# elements read. Otherwise, the count limit would need to be duplicated.
def elements():
for e in reader:
if isinstance(e, beam_runner_api_pb2.TestStreamPayload.Event):
if (e.HasField('watermark_event') or
e.HasField('processing_time_event')):
if include_time_events:
yield e
else:
for tv in e.element_event.elements:
decoded = coder.decode(tv.encoded_element)
yield (
decoded.windowed_value
if include_window_info else decoded.windowed_value.value)
elif isinstance(e, WindowedValueHolder):
yield (
e.windowed_value if include_window_info else e.windowed_value.value)
else:
yield e
# Because we can yield multiple elements from a single TestStreamFileRecord,
# we have to limit the count here to ensure that `n` is fulfilled.
count = 0
for e in elements():
if n and count >= n:
break
yield e
if not isinstance(e, beam_runner_api_pb2.TestStreamPayload.Event):
count += 1
[docs]def elements_to_df(elements, include_window_info=False, element_type=None):
# type: (List[WindowedValue], bool, Any) -> DataFrame # noqa: F821
"""Parses the given elements into a Dataframe.
If the elements are a list of WindowedValues, then it will break out the
elements into their own DataFrame and return it. If include_window_info is
True, then it will concatenate the windowing information onto the elements
DataFrame.
"""
try:
columns_names = [
name for name, _ in named_fields_from_element_type(element_type)
]
except TypeError:
columns_names = None
rows = []
windowed_info = []
for e in elements:
rows.append(e.value)
if include_window_info:
windowed_info.append([e.timestamp.micros, e.windows, e.pane_info])
using_dataframes = isinstance(element_type, pd.DataFrame)
using_series = isinstance(element_type, pd.Series)
if using_dataframes or using_series:
rows_df = pd.concat(rows)
else:
rows_df = pd.DataFrame(rows, columns=columns_names)
if include_window_info and not using_series:
windowed_info_df = pd.DataFrame(
windowed_info, columns=['event_time', 'windows', 'pane_info'])
final_df = pd.concat([rows_df, windowed_info_df], axis=1)
else:
final_df = rows_df
return final_df
[docs]def register_ipython_log_handler():
# type: () -> None
"""Adds the IPython handler to a dummy parent logger (named
'apache_beam.runners.interactive') of all interactive modules' loggers so that
if is_in_notebook, logging displays the logs as HTML in frontends.
"""
# apache_beam.runners.interactive is not a module, thus this "root" logger is
# a dummy one created to hold the IPython log handler. When children loggers
# have propagate as True (by default) and logging level as NOTSET (by default,
# so the "root" logger's logging level takes effect), the IPython log handler
# will be triggered at the "root"'s own logging level. And if a child logger
# sets its logging level, it can take control back.
interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
if any(isinstance(h, IPythonLogHandler)
for h in interactive_root_logger.handlers):
return
interactive_root_logger.setLevel(logging.INFO)
interactive_root_logger.addHandler(IPythonLogHandler())
# Disable the propagation so that logs emitted from interactive modules should
# only be handled by loggers and handlers defined within interactive packages.
interactive_root_logger.propagate = False
[docs]class IPythonLogHandler(logging.Handler):
"""A logging handler to display logs as HTML in IPython backed frontends."""
# TODO(BEAM-7923): Switch to Google hosted CDN once
# https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.
log_template = """
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
<div class="alert alert-{level}">{msg}</div>"""
logging_to_alert_level_map = {
logging.CRITICAL: 'danger',
logging.ERROR: 'danger',
logging.WARNING: 'warning',
logging.INFO: 'info',
logging.DEBUG: 'dark',
logging.NOTSET: 'light'
}
[docs] def emit(self, record):
try:
from html import escape
from IPython.display import HTML
from IPython.display import display
display(HTML(_INTERACTIVE_LOG_STYLE))
display(
HTML(
self.log_template.format(
level=self.logging_to_alert_level_map[record.levelno],
msg=escape(record.msg % record.args))))
except ImportError:
pass # NOOP when dependencies are not available.
[docs]def obfuscate(*inputs):
# type: (*Any) -> str
"""Obfuscates any inputs into a hexadecimal string."""
str_inputs = [str(input) for input in inputs]
merged_inputs = '_'.join(str_inputs)
return hashlib.md5(merged_inputs.encode('utf-8')).hexdigest()
[docs]class ProgressIndicator(object):
"""An indicator visualizing code execution in progress."""
# TODO(BEAM-7923): Switch to Google hosted CDN once
# https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.
spinner_template = """
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
<div id="{id}">
<div class="spinner-border text-info" role="status"></div>
<span class="text-info">{text}</span>
</div>
"""
spinner_removal_template = """
$("#{id}").remove();"""
def __init__(self, enter_text, exit_text):
# type: (str, str) -> None
self._id = 'progress_indicator_{}'.format(obfuscate(id(self)))
self._enter_text = enter_text
self._exit_text = exit_text
def __enter__(self):
try:
from IPython.display import HTML
from IPython.display import display
from apache_beam.runners.interactive import interactive_environment as ie
if ie.current_env().is_in_notebook:
display(
HTML(
self.spinner_template.format(
id=self._id, text=self._enter_text)))
else:
display(self._enter_text)
except ImportError as e:
_LOGGER.error(
'Please use interactive Beam features in an IPython'
'or notebook environment: %s' % e)
def __exit__(self, exc_type, exc_value, traceback):
try:
from IPython.display import Javascript
from IPython.display import display
from IPython.display import display_javascript
from apache_beam.runners.interactive import interactive_environment as ie
if ie.current_env().is_in_notebook:
script = self.spinner_removal_template.format(id=self._id)
display_javascript(
Javascript(
ie._JQUERY_WITH_DATATABLE_TEMPLATE.format(
customized_script=script)))
else:
display(self._exit_text)
except ImportError as e:
_LOGGER.error(
'Please use interactive Beam features in an IPython'
'or notebook environment: %s' % e)
[docs]def progress_indicated(func):
# type: (Callable[..., Any]) -> Callable[..., Any] # noqa: F821
"""A decorator using a unique progress indicator as a context manager to
execute the given function within."""
@functools.wraps(func)
def run_within_progress_indicator(*args, **kwargs):
with ProgressIndicator(f'Processing... {func.__name__}', 'Done.'):
return func(*args, **kwargs)
return run_within_progress_indicator
[docs]def as_json(func):
# type: (Callable[..., Any]) -> Callable[..., str] # noqa: F821
"""A decorator convert python objects returned by callables to json
string.
The decorated function should always return an object parsable by json.dumps.
If the object is not parsable, the str() of original object is returned
instead.
"""
def return_as_json(*args, **kwargs):
try:
return_value = func(*args, **kwargs)
return json.dumps(return_value)
except TypeError:
return str(return_value)
return return_as_json
[docs]def deferred_df_to_pcollection(df):
assert isinstance(df, DeferredBase), '{} is not a DeferredBase'.format(df)
# The proxy is used to output a DataFrame with the correct columns.
#
# TODO(https://github.com/apache/beam/issues/20577): Once type hints are
# implemented for pandas, use those instead of the proxy.
cache = ExpressionCache()
cache.replace_with_cached(df._expr)
proxy = df._expr.proxy()
return to_pcollection(df, yield_elements='pandas', label=str(df._expr)), proxy
[docs]def pcoll_by_name() -> Dict[str, beam.PCollection]:
"""Finds all PCollections by their variable names defined in the notebook."""
from apache_beam.runners.interactive import interactive_environment as ie
inspectables = ie.current_env().inspector_with_synthetic.inspectables
pcolls = {}
for _, inspectable in inspectables.items():
metadata = inspectable['metadata']
if metadata['type'] == 'pcollection':
pcolls[metadata['name']] = inspectable['value']
return pcolls
[docs]def find_pcoll_name(pcoll: beam.PCollection) -> str:
"""Finds the variable name of a PCollection defined by the user.
Returns None if not assigned to any variable.
"""
from apache_beam.runners.interactive import interactive_environment as ie
inspectables = ie.current_env().inspector.inspectables
for _, inspectable in inspectables.items():
if inspectable['value'] is pcoll:
return inspectable['metadata']['name']
return None
[docs]def cacheables() -> Dict[CacheKey, Cacheable]:
"""Finds all Cacheables with their CacheKeys."""
from apache_beam.runners.interactive import interactive_environment as ie
inspectables = ie.current_env().inspector_with_synthetic.inspectables
cacheables = {}
for _, inspectable in inspectables.items():
metadata = inspectable['metadata']
if metadata['type'] == 'pcollection':
cacheable = Cacheable.from_pcoll(metadata['name'], inspectable['value'])
cacheables[cacheable.to_key()] = cacheable
return cacheables
[docs]def watch_sources(pipeline):
"""Watches the unbounded sources in the pipeline.
Sources can output to a PCollection without a user variable reference. In
this case the source is not cached. We still want to cache the data so we
synthetically create a variable to the intermediate PCollection.
"""
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.interactive import interactive_environment as ie
retrieved_user_pipeline = ie.current_env().user_pipeline(pipeline)
pcoll_to_name = {v: k for k, v in pcoll_by_name().items()}
class CacheableUnboundedPCollectionVisitor(PipelineVisitor):
def __init__(self):
self.unbounded_pcolls = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
if isinstance(transform_node.transform,
tuple(ie.current_env().options.recordable_sources)):
for pcoll in transform_node.outputs.values():
# Only generate a synthetic var when it's not already watched. For
# example, the user could have assigned the unbounded source output
# to a variable, watching it again with a different variable name
# creates ambiguity.
if pcoll not in pcoll_to_name:
ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
[docs]def has_unbounded_sources(pipeline):
"""Checks if a given pipeline has recordable sources."""
return len(unbounded_sources(pipeline)) > 0
[docs]def unbounded_sources(pipeline):
"""Returns a pipeline's recordable sources."""
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.interactive import interactive_environment as ie
class CheckUnboundednessVisitor(PipelineVisitor):
"""Visitor checks if there are any unbounded read sources in the Pipeline.
Visitor visits all nodes and checks if it is an instance of recordable
sources.
"""
def __init__(self):
self.unbounded_sources = []
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
if isinstance(transform_node.transform,
tuple(ie.current_env().options.recordable_sources)):
self.unbounded_sources.append(transform_node)
v = CheckUnboundednessVisitor()
pipeline.visit(v)
return v.unbounded_sources
[docs]def create_var_in_main(name: str,
value: Any,
watch: bool = True) -> Tuple[str, Any]:
"""Declares a variable in the main module.
Args:
name: the variable name in the main module.
value: the value of the variable.
watch: whether to watch it in the interactive environment.
Returns:
A 2-entry tuple of the variable name and value.
"""
setattr(importlib.import_module('__main__'), name, value)
if watch:
from apache_beam.runners.interactive import interactive_environment as ie
ie.current_env().watch({name: value})
return name, value
[docs]def assert_bucket_exists(bucket_name):
# type: (str) -> None
"""Asserts whether the specified GCS bucket with the name
bucket_name exists.
Logs an error and raises a ValueError if the bucket does not exist.
Logs a warning if the bucket cannot be verified to exist.
"""
try:
from google.cloud.exceptions import ClientError
from google.cloud.exceptions import NotFound
from apache_beam.io.gcp.gcsio import create_storage_client
storage_client = create_storage_client(PipelineOptions())
storage_client.get_bucket(bucket_name)
except ClientError as e:
if isinstance(e, NotFound):
_LOGGER.error('%s bucket does not exist!', bucket_name)
raise ValueError('Invalid GCS bucket provided!')
else:
_LOGGER.warning(
'ClientError - unable to verify whether bucket %s exists',
bucket_name)
except ImportError:
_LOGGER.warning(
'ImportError - unable to verify whether bucket %s exists', bucket_name)
[docs]def detect_pipeline_runner(pipeline):
if isinstance(pipeline, Pipeline):
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
if isinstance(pipeline.runner, InteractiveRunner):
pipeline_runner = pipeline.runner._underlying_runner
else:
pipeline_runner = pipeline.runner
else:
pipeline_runner = None
return pipeline_runner