Source code for apache_beam.runners.interactive.messaging.interactive_environment_inspector

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Messaging mechanism to inspect the interactive environment.

A singleton instance is accessible from
interactive_environment.current_env().inspector.
"""
# pytype: skip-file

import apache_beam as beam
from apache_beam.runners.interactive.utils import as_json
from apache_beam.runners.interactive.utils import obfuscate


[docs]class InteractiveEnvironmentInspector(object): """Inspector that converts information of the current interactive environment including pipelines and pcollections into JSON data suitable for messaging with applications within/outside the Python kernel. The usage is always that the application side reads the inspectables or list_inspectables first then communicates back to the kernel and get_val for usage on the kernel side. """ def __init__(self): self._inspectables = {} self._anonymous = {} self._inspectable_pipelines = set() @property def inspectables(self): """Lists pipelines and pcollections assigned to variables as inspectables. """ self._inspectables = inspect() return self._inspectables @property def inspectable_pipelines(self): """Returns a dictionary of all inspectable pipelines. The keys are stringified id of pipeline instances. This includes user defined pipeline assigned to variables and anonymous pipelines with inspectable PCollections. If a user defined pipeline is not within the returned dict, it can be considered out of scope, and all resources and memory states related to it should be released. """ _ = self.list_inspectables() return self._inspectable_pipelines @as_json def list_inspectables(self): """Lists inspectables in JSON format. When listing, pcollections are organized by the pipeline they belong to. If a pipeline is no longer assigned to a variable but its pcollections assigned to variables are still in scope, the pipeline will be given a name as 'anonymous_pipeline[id:$inMemoryId]'. The listing doesn't contain object values of the pipelines or pcollections. The obfuscated identifier can be used to trace back to those values in the kernel. The listing includes anonymous pipelines that are not assigned to variables but still containing inspectable PCollections. """ listing = {} pipelines = inspect_pipelines() for pipeline, name in pipelines.items(): metadata = meta(name, pipeline) listing[obfuscate(metadata)] = {'metadata': metadata, 'pcolls': {}} for identifier, inspectable in self.inspectables.items(): if inspectable['metadata']['type'] == 'pcollection': pipeline = inspectable['value'].pipeline if pipeline not in list(pipelines.keys()): pipeline_name = synthesize_pipeline_name(pipeline) pipelines[pipeline] = pipeline_name pipeline_metadata = meta(pipeline_name, pipeline) pipeline_identifier = obfuscate(pipeline_metadata) self._anonymous[pipeline_identifier] = { 'metadata': pipeline_metadata, 'value': pipeline } listing[pipeline_identifier] = { 'metadata': pipeline_metadata, 'pcolls': { identifier: inspectable['metadata'] } } else: pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline)) listing[pipeline_identifier]['pcolls'][identifier] = inspectable[ 'metadata'] self._inspectable_pipelines = dict( (str(id(pipeline)), pipeline) for pipeline in pipelines) return listing
[docs] def get_val(self, identifier): """Retrieves the in memory object itself by identifier. The retrieved object could be a pipeline or a pcollection. If the identifier is not recognized, return None. The identifier can refer to an anonymous pipeline and the object will still be retrieved. """ inspectable = self._inspectables.get(identifier, None) if inspectable: return inspectable['value'] inspectable = self._anonymous.get(identifier, None) if inspectable: return inspectable['value'] return None
[docs] def get_pcoll_data(self, identifier, include_window_info=False): """Retrieves the json formatted PCollection data. If no PCollection value can be retieved from the given identifier, an empty json string will be returned. """ value = self.get_val(identifier) if isinstance(value, beam.pvalue.PCollection): from apache_beam.runners.interactive import interactive_beam as ib dataframe = ib.collect(value, include_window_info=include_window_info) return dataframe.to_json(orient='table') return {}
[docs]def inspect(): """Inspects current interactive environment to track metadata and values of pipelines and pcollections. Each pipeline and pcollections tracked is given a unique identifier. """ from apache_beam.runners.interactive import interactive_environment as ie inspectables = {} for watching in ie.current_env().watching(): for name, value in watching: # Ignore synthetic vars created by Interactive Beam itself. if name.startswith('synthetic_var_'): continue metadata = meta(name, value) identifier = obfuscate(metadata) if isinstance(value, (beam.pipeline.Pipeline, beam.pvalue.PCollection)): inspectables[identifier] = {'metadata': metadata, 'value': value} return inspectables
[docs]def inspect_pipelines(): """Inspects current interactive environment to track all pipelines assigned to variables. The keys are pipeline objects and values are pipeline names. """ from apache_beam.runners.interactive import interactive_environment as ie pipelines = {} for watching in ie.current_env().watching(): for name, value in watching: if isinstance(value, beam.pipeline.Pipeline): pipelines[value] = name return pipelines
[docs]def meta(name, val): """Generates meta data for the given name and value.""" return { 'name': name, 'inMemoryId': id(val), 'type': type(val).__name__.lower() }
[docs]def synthesize_pipeline_name(val): """Synthesizes a pipeline name for the given pipeline object.""" return 'anonymous_pipeline[id:{}]'.format(id(val))