#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Manages displaying pipeline graph and execution status on the frontend.
This module is experimental. No backwards-compatibility guarantees.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import threading
import time
from apache_beam.runners.interactive import interactive_pipeline_graph
try:
import IPython # pylint: disable=import-error
# _display_progress defines how outputs are printed on the frontend.
_display_progress = IPython.display.display
def _formatter(string, pp, cycle): # pylint: disable=unused-argument
pp.text(string)
plain = get_ipython().display_formatter.formatters['text/plain'] # pylint: disable=undefined-variable
plain.for_type(str, _formatter)
# NameError is added here because get_ipython() throws "not defined" NameError
# if not started with IPython.
except (ImportError, NameError):
IPython = None
_display_progress = print
[docs]class DisplayManager(object):
"""Manages displaying pipeline graph and execution status on the frontend."""
def __init__(self, pipeline_info, pipeline_proto, caches_used, cache_manager,
referenced_pcollections, required_transforms):
"""Constructor of DisplayManager.
Args:
pipeline_info: (interactive_runner.PipelineInfo)
pipeline_proto: (Pipeline proto)
caches_used: (set of str) A set of PCollection IDs of those whose cached
results are used in the execution.
cache_manager: (interactive_runner.CacheManager) DisplayManager fetches
the latest status of pipeline execution by querying cache_manager.
referenced_pcollections: (dict from str to PCollection proto) PCollection
ID mapped to PCollection referenced during pipeline execution.
required_transforms: (dict from str to PTransform proto) Mapping from
transform ID to transforms that leads to visible results.
"""
# Every parameter except cache_manager is expected to remain constant.
self._cache_manager = cache_manager
self._referenced_pcollections = referenced_pcollections
self._pipeline_graph = interactive_pipeline_graph.InteractivePipelineGraph(
pipeline_proto,
required_transforms=required_transforms,
referenced_pcollections=referenced_pcollections,
cached_pcollections=caches_used)
# _text_to_print keeps track of information to be displayed.
self._text_to_print = collections.OrderedDict()
self._text_to_print['summary'] = (
'Using %s cached PCollections\nExecuting %s of %s '
'transforms.') % (
# TODO(qinyeli): required_transforms includes ReadCache and
# WriteCache fix it.
len(caches_used), len(required_transforms),
len(pipeline_proto.components.transforms[
pipeline_proto.root_transform_ids[0]].subtransforms))
self._text_to_print.update(
{pcoll_id: "" for pcoll_id in referenced_pcollections})
# _pcollection_stats maps pcoll_id to
# { 'cache_label': cache_label, version': version, 'sample': pcoll_in_list }
self._pcollection_stats = {}
for pcoll_id in pipeline_info.all_pcollections():
self._pcollection_stats[pcoll_id] = {
'cache_label': pipeline_info.cache_label(pcoll_id),
'version': -1,
'sample': []
}
self._producers = {}
for _, transform in pipeline_proto.components.transforms.items():
for pcoll_id in transform.outputs.values():
if pcoll_id not in self._producers or '/' not in transform.unique_name:
self._producers[pcoll_id] = transform.unique_name
# For periodic update.
self._lock = threading.Lock()
self._periodic_update = False
[docs] def update_display(self, force=False):
"""Updates display on the frontend.
Retrieves the latest execution status by querying CacheManager and updates
display on the fronend. The assumption is that there is only one pipeline in
a cell, because it clears up everything in the cell output every update
cycle.
Args:
force: (bool) whether to force updating when no stats change happens.
"""
with self._lock:
stats_updated = False
for pcoll_id, stats in self._pcollection_stats.items():
cache_label = stats['cache_label']
version = stats['version']
if force or not self._cache_manager.is_latest_version(
version, 'sample', cache_label):
pcoll_list, version = self._cache_manager.read('sample', cache_label)
stats['sample'] = pcoll_list
stats['version'] = version
stats_updated = True
if pcoll_id in self._referenced_pcollections:
self._text_to_print[pcoll_id] = (str(
'%s produced %s' % (
self._producers[pcoll_id],
interactive_pipeline_graph.format_sample(pcoll_list, 5))))
if force or stats_updated:
if IPython:
IPython.core.display.clear_output(True)
self._pipeline_graph.update_pcollection_stats(self._pcollection_stats)
self._pipeline_graph.display_graph()
_display_progress('Running...')
for text in self._text_to_print.values():
if text != "":
_display_progress(text)
[docs] def start_periodic_update(self):
"""Start a thread that periodically updates the display."""
self.update_display(True)
self._periodic_update = True
def _updater():
while self._periodic_update:
self.update_display()
time.sleep(.02)
t = threading.Thread(target=_updater)
t.daemon = True
t.start()
[docs] def stop_periodic_update(self):
"""Stop periodically updating the display."""
self.update_display(True)
self._periodic_update = False