#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities to be used in Interactive Beam.
"""
from __future__ import absolute_import
import hashlib
import logging
import pandas as pd
from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
[docs]def to_element_list(
reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
coder, # type: Coder
include_window_info # type: bool
):
# type: (...) -> List[WindowedValue]
"""Returns an iterator that properly decodes the elements from the reader.
"""
for e in reader:
if isinstance(e, TestStreamPayload.Event):
if (e.HasField('watermark_event') or e.HasField('processing_time_event')):
continue
else:
for tv in e.element_event.elements:
decoded = coder.decode(tv.encoded_element)
yield (
decoded.windowed_value
if include_window_info else decoded.windowed_value.value)
else:
yield e.windowed_value if include_window_info else e.windowed_value.value
[docs]def elements_to_df(elements, include_window_info=False):
# type: (List[WindowedValue], bool) -> DataFrame
"""Parses the given elements into a Dataframe.
If the elements are a list of WindowedValues, then it will break out the
elements into their own DataFrame and return it. If include_window_info is
True, then it will concatenate the windowing information onto the elements
DataFrame.
"""
rows = []
windowed_info = []
for e in elements:
rows.append(e.value)
if include_window_info:
windowed_info.append([e.timestamp.micros, e.windows, e.pane_info])
rows_df = pd.DataFrame(rows)
if include_window_info:
windowed_info_df = pd.DataFrame(
windowed_info, columns=['event_time', 'windows', 'pane_info'])
final_df = pd.concat([rows_df, windowed_info_df], axis=1)
else:
final_df = rows_df
return final_df
[docs]def register_ipython_log_handler():
# type: () -> None
"""Adds the IPython handler to a dummy parent logger (named
'apache_beam.runners.interactive') of all interactive modules' loggers so that
if is_in_notebook, logging displays the logs as HTML in frontends.
"""
# apache_beam.runners.interactive is not a module, thus this "root" logger is
# a dummy one created to hold the IPython log handler. When children loggers
# have propagate as True (by default) and logging level as NOTSET (by default,
# so the "root" logger's logging level takes effect), the IPython log handler
# will be triggered at the "root"'s own logging level. And if a child logger
# sets its logging level, it can take control back.
interactive_root_logger = logging.getLogger('apache_beam.runners.interactive')
if any([isinstance(h, IPythonLogHandler)
for h in interactive_root_logger.handlers]):
return
interactive_root_logger.setLevel(logging.INFO)
interactive_root_logger.addHandler(IPythonLogHandler())
# Disable the propagation so that logs emitted from interactive modules should
# only be handled by loggers and handlers defined within interactive packages.
interactive_root_logger.propagate = False
[docs]class IPythonLogHandler(logging.Handler):
"""A logging handler to display logs as HTML in IPython backed frontends."""
# TODO(BEAM-7923): Switch to Google hosted CDN once
# https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.
log_template = """
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
<div class="alert alert-{level}">{msg}</div>"""
logging_to_alert_level_map = {
logging.CRITICAL: 'danger',
logging.ERROR: 'danger',
logging.WARNING: 'warning',
logging.INFO: 'info',
logging.DEBUG: 'dark',
logging.NOTSET: 'light'
}
[docs] def emit(self, record):
try:
from html import escape
from IPython.core.display import HTML
from IPython.core.display import display
display(
HTML(
self.log_template.format(
level=self.logging_to_alert_level_map[record.levelno],
msg=escape(record.msg % record.args))))
except ImportError:
pass # NOOP when dependencies are not available.
[docs]def obfuscate(*inputs):
# type: (*Any) -> str
"""Obfuscates any inputs into a hexadecimal string."""
str_inputs = [str(input) for input in inputs]
merged_inputs = '_'.join(str_inputs)
return hashlib.md5(merged_inputs.encode('utf-8')).hexdigest()
[docs]class ProgressIndicator(object):
"""An indicator visualizing code execution in progress."""
# TODO(BEAM-7923): Switch to Google hosted CDN once
# https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.
spinner_template = """
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
<div id="{id}" class="spinner-border text-info" role="status">
</div>"""
spinner_removal_template = """
$("#{id}").remove();"""
def __init__(self, enter_text, exit_text):
# type: (str, str) -> None
self._id = 'progress_indicator_{}'.format(obfuscate(id(self)))
self._enter_text = enter_text
self._exit_text = exit_text
def __enter__(self):
try:
from IPython.core.display import HTML
from IPython.core.display import display
from apache_beam.runners.interactive import interactive_environment as ie
if ie.current_env().is_in_notebook:
display(HTML(self.spinner_template.format(id=self._id)))
else:
display(self._enter_text)
except ImportError:
pass # NOOP when dependencies are not available.
def __exit__(self, exc_type, exc_value, traceback):
try:
from IPython.core.display import Javascript
from IPython.core.display import display
from IPython.core.display import display_javascript
from apache_beam.runners.interactive import interactive_environment as ie
if ie.current_env().is_in_notebook:
script = self.spinner_removal_template.format(id=self._id)
display_javascript(
Javascript(
ie._JQUERY_WITH_DATATABLE_TEMPLATE.format(
customized_script=script)))
else:
display(self._exit_text)
except ImportError:
pass # NOOP when dependencies are not avaialble.
[docs]def progress_indicated(func):
# type: (Callable[..., Any]) -> Callable[..., Any]
"""A decorator using a unique progress indicator as a context manager to
execute the given function within."""
def run_within_progress_indicator(*args, **kwargs):
with ProgressIndicator('Processing...', 'Done.'):
return func(*args, **kwargs)
return run_within_progress_indicator