#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Messaging mechanism to inspect the interactive environment.
A singleton instance is accessible from
interactive_environment.current_env().inspector.
"""
# pytype: skip-file
import apache_beam as beam
from apache_beam.runners.interactive.utils import as_json
from apache_beam.runners.interactive.utils import obfuscate
[docs]class InteractiveEnvironmentInspector(object):
"""Inspector that converts information of the current interactive environment
including pipelines and pcollections into JSON data suitable for messaging
with applications within/outside the Python kernel.
The usage is always that the application side reads the inspectables or
list_inspectables first then communicates back to the kernel and get_val for
usage on the kernel side.
"""
def __init__(self, ignore_synthetic=True):
self._inspectables = {}
self._anonymous = {}
self._inspectable_pipelines = set()
self._ignore_synthetic = ignore_synthetic
@property
def inspectables(self):
"""Lists pipelines and pcollections assigned to variables as inspectables.
"""
self._inspectables = inspect(self._ignore_synthetic)
return self._inspectables
@property
def inspectable_pipelines(self):
"""Returns a dictionary of all inspectable pipelines. The keys are
stringified id of pipeline instances.
This includes user defined pipeline assigned to variables and anonymous
pipelines with inspectable PCollections.
If a user defined pipeline is not within the returned dict, it can be
considered out of scope, and all resources and memory states related to it
should be released.
"""
_ = self.list_inspectables()
return self._inspectable_pipelines
@as_json
def list_inspectables(self):
"""Lists inspectables in JSON format.
When listing, pcollections are organized by the pipeline they belong to.
If a pipeline is no longer assigned to a variable but its pcollections
assigned to variables are still in scope, the pipeline will be given a name
as 'anonymous_pipeline[id:$inMemoryId]'.
The listing doesn't contain object values of the pipelines or pcollections.
The obfuscated identifier can be used to trace back to those values in the
kernel.
The listing includes anonymous pipelines that are not assigned to variables
but still containing inspectable PCollections.
"""
listing = {}
pipelines = inspect_pipelines()
for pipeline, name in pipelines.items():
metadata = meta(name, pipeline)
listing[obfuscate(metadata)] = {'metadata': metadata, 'pcolls': {}}
for identifier, inspectable in self.inspectables.items():
if inspectable['metadata']['type'] == 'pcollection':
pipeline = inspectable['value'].pipeline
if pipeline not in list(pipelines.keys()):
pipeline_name = synthesize_pipeline_name(pipeline)
pipelines[pipeline] = pipeline_name
pipeline_metadata = meta(pipeline_name, pipeline)
pipeline_identifier = obfuscate(pipeline_metadata)
self._anonymous[pipeline_identifier] = {
'metadata': pipeline_metadata, 'value': pipeline
}
listing[pipeline_identifier] = {
'metadata': pipeline_metadata,
'pcolls': {
identifier: inspectable['metadata']
}
}
else:
pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline))
listing[pipeline_identifier]['pcolls'][identifier] = inspectable[
'metadata']
self._inspectable_pipelines = dict(
(str(id(pipeline)), pipeline) for pipeline in pipelines)
return listing
[docs] def get_val(self, identifier):
"""Retrieves the in memory object itself by identifier.
The retrieved object could be a pipeline or a pcollection. If the
identifier is not recognized, return None.
The identifier can refer to an anonymous pipeline and the object will still
be retrieved.
"""
inspectable = self._inspectables.get(identifier, None)
if inspectable:
return inspectable['value']
inspectable = self._anonymous.get(identifier, None)
if inspectable:
return inspectable['value']
return None
[docs] def get_pcoll_data(self, identifier, include_window_info=False):
"""Retrieves the json formatted PCollection data.
If no PCollection value can be retieved from the given identifier, an empty
json string will be returned.
"""
value = self.get_val(identifier)
if isinstance(value, beam.pvalue.PCollection):
from apache_beam.runners.interactive import interactive_beam as ib
dataframe = ib.collect(value, include_window_info=include_window_info)
return dataframe.to_json(orient='table')
return {}
[docs]def inspect(ignore_synthetic=True):
"""Inspects current interactive environment to track metadata and values of
pipelines and pcollections.
Each pipeline and pcollections tracked is given a unique identifier.
"""
from apache_beam.runners.interactive import interactive_environment as ie
inspectables = {}
for watching in ie.current_env().watching():
for name, value in watching:
# Ignore synthetic vars created by Interactive Beam itself.
if ignore_synthetic and name.startswith('synthetic_var_'):
continue
metadata = meta(name, value)
identifier = obfuscate(metadata)
if isinstance(value, (beam.pipeline.Pipeline, beam.pvalue.PCollection)):
inspectables[identifier] = {'metadata': metadata, 'value': value}
return inspectables
[docs]def inspect_pipelines():
"""Inspects current interactive environment to track all pipelines assigned
to variables. The keys are pipeline objects and values are pipeline names.
"""
from apache_beam.runners.interactive import interactive_environment as ie
pipelines = {}
for watching in ie.current_env().watching():
for name, value in watching:
if isinstance(value, beam.pipeline.Pipeline):
pipelines[value] = name
return pipelines
[docs]def synthesize_pipeline_name(val):
"""Synthesizes a pipeline name for the given pipeline object."""
return 'anonymous_pipeline[id:{}]'.format(id(val))