apache_beam.runners.interactive.interactive_environment module

Module of the current Interactive Beam environment.

For internal use only; no backwards-compatibility guarantees. Provides interfaces to interact with existing Interactive Beam environment. External Interactive Beam users please use interactive_beam module in application code or notebook.

apache_beam.runners.interactive.interactive_environment.current_env()[source]

Gets current Interactive Beam environment.

apache_beam.runners.interactive.interactive_environment.new_env()[source]

Creates a new Interactive Beam environment to replace current one.

class apache_beam.runners.interactive.interactive_environment.InteractiveEnvironment[source]

Bases: object

An interactive environment with cache and pipeline variable metadata.

Interactive Beam will use the watched variable information to determine if a PCollection is assigned to a variable in user pipeline definition. When executing the pipeline, interactivity is applied with implicit cache mechanism for those PCollections if the pipeline is interactive. Users can also visualize and introspect those PCollections in user code since they have handles to the variables.

options

A reference to the global interactive options.

Provided to avoid import loop or excessive dynamic import. All internal Interactive Beam modules should access interactive_beam.options through this property.

is_interactive_ready

If the [interactive] dependencies are installed.

is_in_ipython

If the runtime is within an IPython kernel.

is_in_notebook

If the kernel is connected to a notebook frontend.

If not, it could be that the user is using kernel in a terminal or a unit test.

inspector

Gets the singleton InteractiveEnvironmentInspector to retrieve information consumable by other applications such as a notebook extension.

inspector_with_synthetic

Gets the singleton InteractiveEnvironmentInspector with additional synthetic variables generated by Interactive Beam. Internally used.

cleanup_pipeline(pipeline)[source]
cleanup_environment()[source]
cleanup(pipeline=None)[source]

Cleans up cached states for the given pipeline. Noop if the given pipeline is absent from the environment. Cleans up for all pipelines if no pipeline is specified.

watch(watchable)[source]

Watches a watchable.

A watchable can be a dictionary of variable metadata such as locals(), a str name of a module, a module object or an instance of a class. The variable can come from any scope even local. Duplicated variable naming doesn’t matter since they are different instances. Duplicated variables are also allowed when watching.

watching()[source]

Analyzes and returns a list of pair lists referring to variable names and values from watched scopes.

Each entry in the list represents the variable defined within a watched watchable. Currently, each entry holds a list of pairs. The format might change in the future to hold more metadata. Duplicated pairs are allowed. And multiple paris can have the same variable name as the “first” while having different variable values as the “second” since variables in different scopes can have the same name.

set_cache_manager(cache_manager, pipeline)[source]

Sets the cache manager held by current Interactive Environment for the given pipeline.

get_cache_manager(pipeline, create_if_absent=False)[source]

Gets the cache manager held by current Interactive Environment for the given pipeline. If the pipeline is absent from the environment while create_if_absent is True, creates and returns a new file based cache manager for the pipeline.

evict_cache_manager(pipeline=None)[source]

Evicts the cache manager held by current Interactive Environment for the given pipeline. Noop if the pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

set_recording_manager(recording_manager, pipeline)[source]

Sets the recording manager for the given pipeline.

get_recording_manager(pipeline, create_if_absent=False)[source]

Gets the recording manager for the given pipeline.

evict_recording_manager(pipeline)[source]

Evicts the recording manager for the given pipeline.

This stops the background caching job and clears the cache. Noop if the pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

describe_all_recordings()[source]

Returns a description of the recording for all watched pipelnes.

set_pipeline_result(pipeline, result)[source]

Sets the pipeline run result. Adds one if absent. Otherwise, replace.

evict_pipeline_result(pipeline=None)[source]

Evicts the last run result of the given pipeline. Noop if the pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

pipeline_result(pipeline)[source]

Gets the pipeline run result. None if absent.

set_background_caching_job(pipeline, background_caching_job)[source]

Sets the background caching job started from the given pipeline.

get_background_caching_job(pipeline)[source]

Gets the background caching job started from the given pipeline.

evict_background_caching_job(pipeline=None)[source]

Evicts the background caching job started from the given pipeline. Noop if the given pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

set_test_stream_service_controller(pipeline, controller)[source]

Sets the test stream service controller that has started a gRPC server serving the test stream for any job started from the given user defined pipeline.

get_test_stream_service_controller(pipeline)[source]

Gets the test stream service controller that has started a gRPC server serving the test stream for any job started from the given user defined pipeline.

evict_test_stream_service_controller(pipeline)[source]

Evicts and pops the test stream service controller that has started a gRPC server serving the test stream for any job started from the given user defined pipeline. Noop if the given pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

is_terminated(pipeline)[source]

Queries if the most recent job (by executing the given pipeline) state is in a terminal state. True if absent.

set_cached_source_signature(pipeline, signature)[source]
get_cached_source_signature(pipeline)[source]
evict_cached_source_signature(pipeline=None)[source]

Evicts the signature generated for each recorded source of the given pipeline. Noop if the given pipeline is absent from the environment. If no pipeline is specified, evicts for all pipelines.

track_user_pipelines()[source]

Record references to all user defined pipeline instances watched in current environment.

Current static global singleton interactive environment holds references to a set of pipeline instances defined by the user in the watched scope. Interactive Beam features could use the references to determine if a given pipeline is defined by user or implicitly created by Beam SDK or runners, then handle them differently.

This is invoked every time a PTransform is to be applied if the current code execution is under ipython due to the possibility that any user defined pipeline can be re-evaluated through notebook cell re-execution at any time.

Each time this is invoked, it will check if there is a cache manager already created for each user defined pipeline. If not, create one for it.

If a pipeline is no longer watched due to re-execution while its PCollections are still in watched scope, the pipeline becomes anonymous but still accessible indirectly through references to its PCollections. This function also clears up internal states for those anonymous pipelines once all their PCollections are anonymous.

tracked_user_pipelines

Returns the user pipelines in this environment.

user_pipeline(derived_pipeline)[source]

Returns the user pipeline for the given derived pipeline.

add_user_pipeline(user_pipeline)[source]
add_derived_pipeline(user_pipeline, derived_pipeline)[source]

Adds the derived pipeline to the parent user pipeline.

evict_tracked_pipelines(user_pipeline)[source]

Evicts the user pipeline and its derived pipelines.

pipeline_id_to_pipeline(pid)[source]

Converts a pipeline id to a user pipeline.

mark_pcollection_computed(pcolls)[source]

Marks computation completeness for the given pcolls.

Interactive Beam can use this information to determine if a computation is needed to introspect the data of any given PCollection.

evict_computed_pcollections(pipeline=None)[source]

Evicts all computed PCollections for the given pipeline. If no pipeline is specified, evicts for all pipelines.

computed_pcollections
load_jquery_with_datatable()[source]

Loads common resources to enable jquery with datatable configured for notebook frontends if necessary. If the resources have been loaded, NOOP.

A window.interactive_beam_jquery with datatable plugin configured can be used in following notebook cells once this is invoked.

  1. There should only be one jQuery imported.
  2. Datatable needs to be imported after jQuery is loaded.
  3. Imported jQuery is attached to window named as jquery[version].
  4. The window attachment needs to happen at the end of import chain until all jQuery plugins are set.
import_html_to_head(html_hrefs)[source]

Imports given external HTMLs (supported through webcomponents) into the head of the document.

On load of webcomponentsjs, import given HTMLs. If HTML import is already supported, skip loading webcomponentsjs.

No matter how many times an HTML import occurs in the document, only the first occurrence really embeds the external HTML. In a notebook environment, the body of the document is always changing due to cell [re-]execution, deletion and re-ordering. Thus, HTML imports shouldn’t be put in the body especially the output areas of notebook cells.

get_sql_chain(pipeline, set_user_pipeline=False)[source]