#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module of the current Interactive Beam environment.
For internal use only; no backwards-compatibility guarantees.
Provides interfaces to interact with existing Interactive Beam environment.
External Interactive Beam users please use interactive_beam module in
application code or notebook.
"""
from __future__ import absolute_import
import importlib
_interactive_beam_env = None
[docs]def current_env(cache_manager=None):
"""Gets current Interactive Beam environment."""
global _interactive_beam_env
if not _interactive_beam_env:
_interactive_beam_env = InteractiveEnvironment(cache_manager)
return _interactive_beam_env
[docs]def new_env(cache_manager=None):
"""Creates a new Interactive Beam environment to replace current one."""
global _interactive_beam_env
_interactive_beam_env = None
return current_env(cache_manager)
[docs]class InteractiveEnvironment(object):
"""An interactive environment with cache and pipeline variable metadata.
Interactive Beam will use the watched variable information to determine if a
PCollection is assigned to a variable in user pipeline definition. When
executing the pipeline, interactivity is applied with implicit cache
mechanism for those PCollections if the pipeline is interactive. Users can
also visualize and introspect those PCollections in user code since they have
handles to the variables.
"""
def __init__(self, cache_manager=None):
self._cache_manager = cache_manager
# Holds class instances, module object, string of module names.
self._watching_set = set()
# Holds variables list of (Dict[str, object]).
self._watching_dict_list = []
# Always watch __main__ module.
self.watch('__main__')
[docs] def watch(self, watchable):
"""Watches a watchable.
A watchable can be a dictionary of variable metadata such as locals(), a str
name of a module, a module object or an instance of a class. The variable
can come from any scope even local. Duplicated variable naming doesn't
matter since they are different instances. Duplicated variables are also
allowed when watching.
"""
if isinstance(watchable, dict):
self._watching_dict_list.append(watchable.items())
else:
self._watching_set.add(watchable)
[docs] def watching(self):
"""Analyzes and returns a list of pair lists referring to variable names and
values from watched scopes.
Each entry in the list represents the variable defined within a watched
watchable. Currently, each entry holds a list of pairs. The format might
change in the future to hold more metadata. Duplicated pairs are allowed.
And multiple paris can have the same variable name as the "first" while
having different variable values as the "second" since variables in
different scopes can have the same name.
"""
watching = list(self._watching_dict_list)
for watchable in self._watching_set:
if isinstance(watchable, str):
module = importlib.import_module(watchable)
watching.append(vars(module).items())
else:
watching.append(vars(watchable).items())
return watching
[docs] def set_cache_manager(self, cache_manager):
"""Sets the cache manager held by current Interactive Environment."""
self._cache_manager = cache_manager
[docs] def cache_manager(self):
"""Gets the cache manager held by current Interactive Environment."""
return self._cache_manager