apache_beam.runners.interactive.interactive_beam module

Module of Interactive Beam features that can be used in notebook.

The purpose of the module is to reduce the learning curve of Interactive Beam users, provide a single place for importing and add sugar syntax for all Interactive Beam components. It gives users capability to interact with existing environment/session/context for Interactive Beam and visualize PCollections as bounded dataset. In the meantime, it hides the interactivity implementation from users so that users can focus on developing Beam pipeline without worrying about how hidden states in the interactive session are managed.

A convention to import this module:
from apache_beam.runners.interactive import interactive_beam as ib

Note: If you want backward-compatibility, only invoke interfaces provided by this module in your notebook or application code.

class apache_beam.runners.interactive.interactive_beam.Options[source]

Bases: apache_beam.runners.interactive.options.interactive_options.InteractiveOptions

Options that guide how Interactive Beam works.

enable_recording_replay

Whether replayable source data recorded should be replayed for multiple PCollection evaluations and pipeline runs as long as the data recorded is still valid.

recordable_sources

Interactive Beam automatically records data from sources in this set.

recording_duration

The data recording of sources ends as soon as the background source recording job has run for this long.

recording_size_limit

The data recording of sources ends as soon as the size (in bytes) of data recorded from recordable sources reaches the limit.

display_timestamp_format

The format in which timestamps are displayed.

Default is ‘%Y-%m-%d %H:%M:%S.%f%z’, e.g. 2020-02-01 15:05:06.000015-08:00.

display_timezone

The timezone in which timestamps are displayed.

Defaults to local timezone.

cache_root

The cache directory specified by the user.

Defaults to None.

class apache_beam.runners.interactive.interactive_beam.Recordings[source]

Bases: object

An introspection interface for recordings for pipelines.

When a user materializes a PCollection onto disk (eg. ib.show) for a streaming pipeline, a background source recording job is started. This job pulls data from all defined unbounded sources for that PCollection’s pipeline. The following methods allow for introspection into that background recording job.

describe(pipeline=None)[source]

Returns a description of all the recordings for the given pipeline.

If no pipeline is given then this returns a dictionary of descriptions for all pipelines.

clear(pipeline)[source]

Clears all recordings of the given pipeline. Returns True if cleared.

stop(pipeline)[source]

Stops the background source recording of the given pipeline.

record(pipeline)[source]

Starts a background source recording job for the given pipeline. Returns True if the recording job was started.

class apache_beam.runners.interactive.interactive_beam.Clusters[source]

Bases: object

An interface to control clusters implicitly created and managed by the current interactive environment. This class is not needed and should not be used otherwise.

Do not use it for clusters a user explicitly manages: e.g., if you have a Flink cluster running somewhere and provides the flink master when running a pipeline with the FlinkRunner, the cluster will not be tracked or managed by Beam. To reuse the same cluster for your pipelines, use the same pipeline options: e.g., a pipeline option with the same flink master if you are using FlinkRunner.

This module is experimental. No backwards-compatibility guarantees.

Interactive Beam automatically creates/reuses existing worker clusters to execute pipelines when it detects the need from configurations. Currently, the only supported cluster implementation is Flink running on Cloud Dataproc.

To configure a pipeline to run on Cloud Dataproc with Flink, set the underlying runner of the InteractiveRunner to FlinkRunner and the pipeline options to indicate where on Cloud the FlinkRunner should be deployed to.

An example to enable automatic Dataproc cluster creation/reuse:

options = PipelineOptions([
    '--project=my-project',
    '--region=my-region',
    '--environment_type=DOCKER'])
pipeline = beam.Pipeline(InteractiveRunner(
    underlying_runner=FlinkRunner()), options=options)

Reuse a pipeline options in another pipeline would configure Interactive Beam to reuse the same Dataproc cluster implicitly managed by the current interactive environment. If a flink_master is identified as a known cluster, the corresponding cluster is also resued. Furthermore, if a cluster is explicitly created by using a pipeline as an identifier to a known cluster, the cluster is reused.

An example:

# If pipeline runs on a known cluster, below code reuses the cluster
# manager without creating a new one.
dcm = ib.clusters.create(pipeline)

To provision the cluster, use WorkerOptions. Supported configurations are:

1. subnetwork
2. num_workers
3. machine_type

To configure a pipeline to run on an existing FlinkRunner deployed elsewhere, set the flink_master explicitly so no cluster will be created/reused.

An example pipeline options to skip automatic Dataproc cluster usage:

options = PipelineOptions([
    '--flink_master=some.self.hosted.flink:port',
    '--environment_type=DOCKER'])

To configure a pipeline to run on a local FlinkRunner, explicitly set the default cluster metadata to None: ib.clusters.set_default_cluster(None).

DATAPROC_MINIMUM_WORKER_NUM = 2
create(cluster_identifier: Union[str, apache_beam.pipeline.Pipeline, apache_beam.runners.interactive.dataproc.types.ClusterMetadata]) → apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.DataprocClusterManager[source]

Creates a Dataproc cluster manager provisioned for the cluster identified. If the cluster is known, returns an existing cluster manager.

cleanup(cluster_identifier: Union[str, apache_beam.pipeline.Pipeline, apache_beam.runners.interactive.dataproc.types.ClusterMetadata, None] = None, force: bool = False) → None[source]

Cleans up the cluster associated with the given cluster_identifier.

When None cluster_identifier is provided: if force is True, cleans up for all clusters; otherwise, do a dry run and NOOP. If a beam.Pipeline is given as the ClusterIdentifier while multiple pipelines share the same cluster, it only cleans up the association between the pipeline and the cluster identified. If the cluster_identifier is unknown, NOOP.

describe(cluster_identifier: Union[str, apache_beam.pipeline.Pipeline, apache_beam.runners.interactive.dataproc.types.ClusterMetadata, None] = None) → Union[apache_beam.runners.interactive.dataproc.types.ClusterMetadata, List[apache_beam.runners.interactive.dataproc.types.ClusterMetadata]][source]

Describes the ClusterMetadata by a ClusterIdentifier.

If no cluster_identifier is given or if the cluster_identifier is unknown, it returns descriptions for all known clusters.

Example usage: # Describe the cluster executing work for a pipeline. ib.clusters.describe(pipeline) # Describe the cluster with the flink master url. ib.clusters.describe(master_url) # Describe all existing clusters. ib.clusters.describe()

set_default_cluster(cluster_identifier: Union[str, apache_beam.pipeline.Pipeline, apache_beam.runners.interactive.dataproc.types.ClusterMetadata, None] = None) → None[source]

Temporarily sets the default metadata for creating or reusing a DataprocClusterManager. It is always updated to the most recently created cluster.

If no known ClusterMetadata can be identified by the ClusterIdentifer, NOOP. If None is set, next time when Flink is in use, if no cluster is explicitly configured by a pipeline, the job runs locally.

cluster_metadata(cluster_identifier: Union[str, apache_beam.pipeline.Pipeline, apache_beam.runners.interactive.dataproc.types.ClusterMetadata, None] = None) → Optional[apache_beam.runners.interactive.dataproc.types.ClusterMetadata][source]

Fetches the ClusterMetadata by a ClusterIdentifier that could be a URL in string, a Beam pipeline, or an equivalent to a known ClusterMetadata;

If the given cluster_identifier is an URL or a pipeline that is unknown to the current environment, the default cluster metadata (could be None) is returned. If the given cluster_identifier is a ClusterMetadata but unknown to the current environment, passes it through (NOOP).

apache_beam.runners.interactive.interactive_beam.watch(watchable)[source]

Monitors a watchable.

This allows Interactive Beam to implicitly pass on the information about the location of your pipeline definition.

Current implementation mainly watches for PCollection variables defined in user code. A watchable can be a dictionary of variable metadata such as locals(), a str name of a module, a module object or an instance of a class. The variable can come from any scope even local variables in a method of a class defined in a module.

Below are all valid:

watch(__main__)  # if import __main__ is already invoked
watch('__main__')  # does not require invoking import __main__ beforehand
watch(self)  # inside a class
watch(SomeInstance())  # an instance of a class
watch(locals())  # inside a function, watching local variables within

If you write a Beam pipeline in the __main__ module directly, since the __main__ module is always watched, you don’t have to instruct Interactive Beam. If your Beam pipeline is defined in some module other than __main__, such as inside a class function or a unit test, you can watch() the scope.

For example:

class Foo(object)
  def run_pipeline(self):
    with beam.Pipeline() as p:
      init_pcoll = p |  'Init Create' >> beam.Create(range(10))
      watch(locals())
    return init_pcoll
init_pcoll = Foo().run_pipeline()

Interactive Beam caches init_pcoll for the first run.

Then you can use:

show(init_pcoll)

To visualize data from init_pcoll once the pipeline is executed.

apache_beam.runners.interactive.interactive_beam.show(*pcolls, include_window_info=False, visualize_data=False, n='inf', duration='inf')[source]

Shows given PCollections in an interactive exploratory way if used within a notebook, or prints a heading sampled data if used within an ipython shell. Noop if used in a non-interactive environment.

Parameters:
  • include_window_info – (optional) if True, windowing information of the data will be visualized too. Default is false.
  • visualize_data – (optional) by default, the visualization contains data tables rendering data from given pcolls separately as if they are converted into dataframes. If visualize_data is True, there will be a more dive-in widget and statistically overview widget of the data. Otherwise, those 2 data visualization widgets will not be displayed.
  • n – (optional) max number of elements to visualize. Default ‘inf’.
  • duration – (optional) max duration of elements to read in integer seconds or a string duration. Default ‘inf’.

The given pcolls can be dictionary of PCollections (as values), or iterable of PCollections or plain PCollection values.

The user can specify either the max number of elements with n to read or the maximum duration of elements to read with duration. When a limiter is not supplied, it is assumed to be infinite.

By default, the visualization contains data tables rendering data from given pcolls separately as if they are converted into dataframes. If visualize_data is True, there will be a more dive-in widget and statistically overview widget of the data. Otherwise, those 2 data visualization widgets will not be displayed.

Ad hoc builds a pipeline fragment including only transforms that are necessary to produce data for given PCollections pcolls, runs the pipeline fragment to compute data for those pcolls and then visualizes the data.

The function is always blocking. If used within a notebook, the data visualized might be dynamically updated before the function returns as more and more data could getting processed and emitted when the pipeline fragment is being executed. If used within an ipython shell, there will be no dynamic plotting but a static plotting in the end of pipeline fragment execution.

The PCollections given must belong to the same pipeline.

For example:

p = beam.Pipeline(InteractiveRunner())
init = p | 'Init' >> beam.Create(range(1000))
square = init | 'Square' >> beam.Map(lambda x: x * x)
cube = init | 'Cube' >> beam.Map(lambda x: x ** 3)

# Below builds a pipeline fragment from the defined pipeline `p` that
# contains only applied transforms of `Init` and `Square`. Then the
# interactive runner runs the pipeline fragment implicitly to compute data
# represented by PCollection `square` and visualizes it.
show(square)

# This is equivalent to `show(square)` because `square` depends on `init`
# and `init` is included in the pipeline fragment and computed anyway.
show(init, square)

# Below is similar to running `p.run()`. It computes data for both
# PCollection `square` and PCollection `cube`, then visualizes them.
show(square, cube)
apache_beam.runners.interactive.interactive_beam.collect(pcoll, n='inf', duration='inf', include_window_info=False)[source]

Materializes the elements from a PCollection into a Dataframe.

This reads each element from file and reads only the amount that it needs into memory. The user can specify either the max number of elements to read or the maximum duration of elements to read. When a limiter is not supplied, it is assumed to be infinite.

Parameters:
  • n – (optional) max number of elements to visualize. Default ‘inf’.
  • duration – (optional) max duration of elements to read in integer seconds or a string duration. Default ‘inf’.
  • include_window_info – (optional) if True, appends the windowing information to each row. Default False.

For example:

p = beam.Pipeline(InteractiveRunner())
init = p | 'Init' >> beam.Create(range(10))
square = init | 'Square' >> beam.Map(lambda x: x * x)

# Run the pipeline and bring the PCollection into memory as a Dataframe.
in_memory_square = head(square, n=5)
apache_beam.runners.interactive.interactive_beam.show_graph(pipeline)[source]

Shows the current pipeline shape of a given Beam pipeline as a DAG.

apache_beam.runners.interactive.interactive_beam.evict_recorded_data(pipeline=None)[source]

Forcefully evicts all recorded replayable data for the given pipeline. If no pipeline is specified, evicts for all user defined pipelines.

Once invoked, Interactive Beam will record new data based on the guidance of options the next time it evaluates/visualizes PCollections or runs pipelines.