Source code for apache_beam.runners.interactive.display_manager

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Manages displaying pipeline graph and execution status on the frontend.

This module is experimental. No backwards-compatibility guarantees.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import threading
import time

from apache_beam.runners.interactive import interactive_pipeline_graph

try:
  import IPython  # pylint: disable=import-error
  # _display_progress defines how outputs are printed on the frontend.
  _display_progress = IPython.display.display

  def _formatter(string, pp, cycle):  # pylint: disable=unused-argument
    pp.text(string)
  plain = get_ipython().display_formatter.formatters['text/plain']  # pylint: disable=undefined-variable
  plain.for_type(str, _formatter)

# NameError is added here because get_ipython() throws "not defined" NameError
# if not started with IPython.
except (ImportError, NameError):
  IPython = None
  _display_progress = print


[docs]class DisplayManager(object): """Manages displaying pipeline graph and execution status on the frontend.""" def __init__(self, pipeline_info, pipeline_proto, caches_used, cache_manager, referenced_pcollections, required_transforms): """Constructor of DisplayManager. Args: pipeline_info: (interactive_runner.PipelineInfo) pipeline_proto: (Pipeline proto) caches_used: (set of str) A set of PCollection IDs of those whose cached results are used in the execution. cache_manager: (interactive_runner.CacheManager) DisplayManager fetches the latest status of pipeline execution by querying cache_manager. referenced_pcollections: (dict from str to PCollection proto) PCollection ID mapped to PCollection referenced during pipeline execution. required_transforms: (dict from str to PTransform proto) Mapping from transform ID to transforms that leads to visible results. """ # Every parameter except cache_manager is expected to remain constant. self._pipeline_info = pipeline_info self._pipeline_proto = pipeline_proto self._caches_used = caches_used self._cache_manager = cache_manager self._referenced_pcollections = referenced_pcollections self._required_transforms = required_transforms self._pcollection_stats = {} self._producers = {} for _, transform in pipeline_proto.components.transforms.items(): for pcoll_id in transform.outputs.values(): self._producers[pcoll_id] = transform.unique_name # To be printed. self._status = ( 'Using %s cached PCollections\nExecuting %s of %s ' 'transforms.') % ( len(caches_used), len(required_transforms) - len(caches_used) - 1, len([ t for t in pipeline_proto.components.transforms.values() if not t.subtransforms ])) self._text_samples = [] # For periodic update. self._lock = threading.Lock() self._periodic_update = False
[docs] def update_display(self, force=False): """Updates display on the frontend. Retrieves the latest execution status by querying CacheManager and updates display on the fronend. The assumption is that there is only one pipeline in a cell, because it clears up everything in the cell output every update cycle. Args: force: (bool) whether to force updating when no stats change happens. """ with self._lock: new_stats = {} for pcoll_id in self._pipeline_info.all_pcollections(): cache_label = self._pipeline_info.derivation(pcoll_id).cache_label() if (pcoll_id not in self._pcollection_stats and self._cache_manager.exists('sample', cache_label)): contents = list( self._cache_manager.read('sample', cache_label)) new_stats[pcoll_id] = {'sample': contents} if pcoll_id in self._referenced_pcollections: self._text_samples.append(str( '%s produced %s' % ( self._producers[pcoll_id], interactive_pipeline_graph.format_sample(contents, 5)))) if force or new_stats: if IPython: IPython.core.display.clear_output(True) self._pcollection_stats.update(new_stats) # TODO(qinyeli): Enable updating pipeline graph instead of constructing # everytime, if it worths. pipeline_graph = interactive_pipeline_graph.InteractivePipelineGraph( self._pipeline_proto, required_transforms=self._required_transforms, referenced_pcollections=self._referenced_pcollections, cached_pcollections=self._caches_used, pcollection_stats=self._pcollection_stats) pipeline_graph.display_graph() _display_progress('Running...') _display_progress(self._status) for text_sample in self._text_samples: _display_progress(text_sample)
[docs] def start_periodic_update(self): """Start a thread that periodically updates the display.""" self.update_display(True) self._periodic_update = True def _updater(): while self._periodic_update: self.update_display() time.sleep(.02) t = threading.Thread(target=_updater) t.daemon = True t.start()
[docs] def stop_periodic_update(self): """Stop periodically updating the display.""" self.update_display() self._periodic_update = False