Source code for apache_beam.runners.interactive.utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Utilities to be used in  Interactive Beam.
"""

import functools
import hashlib
import json
import logging

import pandas as pd

from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.frame_base import DeferredBase
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive.caching.expression_cache import ExpressionCache
from apache_beam.testing.test_stream import WindowedValueHolder
from apache_beam.typehints.schemas import named_fields_from_element_type

_LOGGER = logging.getLogger(__name__)


[docs]def to_element_list( reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]] coder, # type: Coder include_window_info, # type: bool n=None, # type: int include_time_events=False, # type: bool ): # type: (...) -> List[WindowedValue] """Returns an iterator that properly decodes the elements from the reader. """ # Defining a generator like this makes it easier to limit the count of # elements read. Otherwise, the count limit would need to be duplicated. def elements(): for e in reader: if isinstance(e, TestStreamPayload.Event): if (e.HasField('watermark_event') or e.HasField('processing_time_event')): if include_time_events: yield e else: for tv in e.element_event.elements: decoded = coder.decode(tv.encoded_element) yield ( decoded.windowed_value if include_window_info else decoded.windowed_value.value) elif isinstance(e, WindowedValueHolder): yield ( e.windowed_value if include_window_info else e.windowed_value.value) else: yield e # Because we can yield multiple elements from a single TestStreamFileRecord, # we have to limit the count here to ensure that `n` is fulfilled. count = 0 for e in elements(): if n and count >= n: break yield e if not isinstance(e, TestStreamPayload.Event): count += 1
[docs]def elements_to_df(elements, include_window_info=False, element_type=None): # type: (List[WindowedValue], bool, Any) -> DataFrame """Parses the given elements into a Dataframe. If the elements are a list of WindowedValues, then it will break out the elements into their own DataFrame and return it. If include_window_info is True, then it will concatenate the windowing information onto the elements DataFrame. """ try: columns_names = [ name for name, _ in named_fields_from_element_type(element_type) ] except TypeError: columns_names = None rows = [] windowed_info = [] for e in elements: rows.append(e.value) if include_window_info: windowed_info.append([e.timestamp.micros, e.windows, e.pane_info]) using_dataframes = isinstance(element_type, pd.DataFrame) using_series = isinstance(element_type, pd.Series) if using_dataframes or using_series: rows_df = pd.concat(rows) else: rows_df = pd.DataFrame(rows, columns=columns_names) if include_window_info and not using_series: windowed_info_df = pd.DataFrame( windowed_info, columns=['event_time', 'windows', 'pane_info']) final_df = pd.concat([rows_df, windowed_info_df], axis=1) else: final_df = rows_df return final_df
[docs]def register_ipython_log_handler(): # type: () -> None """Adds the IPython handler to a dummy parent logger (named 'apache_beam.runners.interactive') of all interactive modules' loggers so that if is_in_notebook, logging displays the logs as HTML in frontends. """ # apache_beam.runners.interactive is not a module, thus this "root" logger is # a dummy one created to hold the IPython log handler. When children loggers # have propagate as True (by default) and logging level as NOTSET (by default, # so the "root" logger's logging level takes effect), the IPython log handler # will be triggered at the "root"'s own logging level. And if a child logger # sets its logging level, it can take control back. interactive_root_logger = logging.getLogger('apache_beam.runners.interactive') if any([isinstance(h, IPythonLogHandler) for h in interactive_root_logger.handlers]): return interactive_root_logger.setLevel(logging.INFO) interactive_root_logger.addHandler(IPythonLogHandler()) # Disable the propagation so that logs emitted from interactive modules should # only be handled by loggers and handlers defined within interactive packages. interactive_root_logger.propagate = False
[docs]class IPythonLogHandler(logging.Handler): """A logging handler to display logs as HTML in IPython backed frontends.""" # TODO(BEAM-7923): Switch to Google hosted CDN once # https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved. log_template = """ <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous"> <div class="alert alert-{level}">{msg}</div>""" logging_to_alert_level_map = { logging.CRITICAL: 'danger', logging.ERROR: 'danger', logging.WARNING: 'warning', logging.INFO: 'info', logging.DEBUG: 'dark', logging.NOTSET: 'light' }
[docs] def emit(self, record): try: from html import escape from IPython.core.display import HTML from IPython.core.display import display display( HTML( self.log_template.format( level=self.logging_to_alert_level_map[record.levelno], msg=escape(record.msg % record.args)))) except ImportError: pass # NOOP when dependencies are not available.
[docs]def obfuscate(*inputs): # type: (*Any) -> str """Obfuscates any inputs into a hexadecimal string.""" str_inputs = [str(input) for input in inputs] merged_inputs = '_'.join(str_inputs) return hashlib.md5(merged_inputs.encode('utf-8')).hexdigest()
[docs]class ProgressIndicator(object): """An indicator visualizing code execution in progress.""" # TODO(BEAM-7923): Switch to Google hosted CDN once # https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved. spinner_template = """ <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous"> <div id="{id}" class="spinner-border text-info" role="status"> </div>""" spinner_removal_template = """ $("#{id}").remove();""" def __init__(self, enter_text, exit_text): # type: (str, str) -> None self._id = 'progress_indicator_{}'.format(obfuscate(id(self))) self._enter_text = enter_text self._exit_text = exit_text def __enter__(self): try: from IPython.core.display import HTML from IPython.core.display import display from apache_beam.runners.interactive import interactive_environment as ie if ie.current_env().is_in_notebook: display(HTML(self.spinner_template.format(id=self._id))) else: display(self._enter_text) except ImportError as e: _LOGGER.error( 'Please use interactive Beam features in an IPython' 'or notebook environment: %s' % e) def __exit__(self, exc_type, exc_value, traceback): try: from IPython.core.display import Javascript from IPython.core.display import display from IPython.core.display import display_javascript from apache_beam.runners.interactive import interactive_environment as ie if ie.current_env().is_in_notebook: script = self.spinner_removal_template.format(id=self._id) display_javascript( Javascript( ie._JQUERY_WITH_DATATABLE_TEMPLATE.format( customized_script=script))) else: display(self._exit_text) except ImportError as e: _LOGGER.error( 'Please use interactive Beam features in an IPython' 'or notebook environment: %s' % e)
[docs]def progress_indicated(func): # type: (Callable[..., Any]) -> Callable[..., Any] """A decorator using a unique progress indicator as a context manager to execute the given function within.""" @functools.wraps(func) def run_within_progress_indicator(*args, **kwargs): with ProgressIndicator('Processing...', 'Done.'): return func(*args, **kwargs) return run_within_progress_indicator
[docs]def as_json(func): # type: (Callable[..., Any]) -> Callable[..., str] """A decorator convert python objects returned by callables to json string. The decorated function should always return an object parsable by json.dumps. If the object is not parsable, the str() of original object is returned instead. """ def return_as_json(*args, **kwargs): try: return_value = func(*args, **kwargs) return json.dumps(return_value) except TypeError: return str(return_value) return return_as_json
[docs]def deferred_df_to_pcollection(df): assert isinstance(df, DeferredBase), '{} is not a DeferredBase'.format(df) # The proxy is used to output a DataFrame with the correct columns. # # TODO(BEAM-11064): Once type hints are implemented for pandas, use those # instead of the proxy. cache = ExpressionCache() cache.replace_with_cached(df._expr) proxy = df._expr.proxy() return to_pcollection(df, yield_elements='pandas', label=str(df._expr)), proxy