Source code for apache_beam.runners.interactive.interactive_runner

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A runner that allows running of Beam pipelines interactively.

This module is experimental. No backwards-compatibility guarantees.
"""

# pytype: skip-file

import logging
from typing import Optional

import apache_beam as beam
from apache_beam import runners
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as inst
from apache_beam.runners.interactive import background_caching_job
from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.options import capture_control
from apache_beam.runners.interactive.utils import to_element_list
from apache_beam.runners.interactive.utils import watch_sources
from apache_beam.testing.test_stream_service import TestStreamServiceController

# size of PCollection samples cached.
SAMPLE_SIZE = 8

_LOGGER = logging.getLogger(__name__)


[docs]class InteractiveRunner(runners.PipelineRunner): """An interactive runner for Beam Python pipelines. Allows interactively building and running Beam Python pipelines. """ def __init__( self, underlying_runner=None, render_option=None, skip_display=True, force_compute=True, blocking=True): """Constructor of InteractiveRunner. Args: underlying_runner: (runner.PipelineRunner) render_option: (str) this parameter decides how the pipeline graph is rendered. See display.pipeline_graph_renderer for available options. skip_display: (bool) whether to skip display operations when running the pipeline. Useful if running large pipelines when display is not needed. force_compute: (bool) whether sequential pipeline runs can use cached data of PCollections computed from the previous runs including show API invocation from interactive_beam module. If True, always run the whole pipeline and compute data for PCollections forcefully. If False, use available data and run minimum pipeline fragment to only compute data not available. blocking: (bool) whether the pipeline run should be blocking or not. """ self._underlying_runner = ( underlying_runner or direct_runner.DirectRunner()) self._render_option = render_option self._in_session = False self._skip_display = skip_display self._force_compute = force_compute self._blocking = blocking
[docs] def is_fnapi_compatible(self): # TODO(https://github.com/apache/beam/issues/19937): # return self._underlying_runner.is_fnapi_compatible() return False
[docs] def set_render_option(self, render_option): """Sets the rendering option. Args: render_option: (str) this parameter decides how the pipeline graph is rendered. See display.pipeline_graph_renderer for available options. """ self._render_option = render_option
[docs] def start_session(self): """Start the session that keeps back-end managers and workers alive. """ if self._in_session: return enter = getattr(self._underlying_runner, '__enter__', None) if enter is not None: _LOGGER.info('Starting session.') self._in_session = True enter() else: _LOGGER.error('Keep alive not supported.')
[docs] def end_session(self): """End the session that keeps backend managers and workers alive. """ if not self._in_session: return exit = getattr(self._underlying_runner, '__exit__', None) if exit is not None: self._in_session = False _LOGGER.info('Ending session.') exit(None, None, None)
[docs] def apply(self, transform, pvalueish, options): # TODO(qinyeli, BEAM-646): Remove runner interception of apply. return self._underlying_runner.apply(transform, pvalueish, options)
[docs] def run_pipeline(self, pipeline, options): if not ie.current_env().options.enable_recording_replay: capture_control.evict_captured_data() if self._force_compute: ie.current_env().evict_computed_pcollections() # Make sure that sources without a user reference are still cached. watch_sources(pipeline) user_pipeline = ie.current_env().user_pipeline(pipeline) from apache_beam.runners.portability.flink_runner import FlinkRunner if isinstance(self._underlying_runner, FlinkRunner): self.configure_for_flink(user_pipeline, options) pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) # The user_pipeline analyzed might be None if the pipeline given has nothing # to be cached and tracing back to the user defined pipeline is impossible. # When it's None, there is no need to cache including the background # caching job and no result to track since no background caching job is # started at all. if user_pipeline: # Should use the underlying runner and run asynchronously. background_caching_job.attempt_to_run_background_caching_job( self._underlying_runner, user_pipeline, options) if (background_caching_job.has_source_to_cache(user_pipeline) and not background_caching_job.is_a_test_stream_service_running( user_pipeline)): streaming_cache_manager = ie.current_env().get_cache_manager( user_pipeline) # Only make the server if it doesn't exist already. if (streaming_cache_manager and not ie.current_env().get_test_stream_service_controller( user_pipeline)): def exception_handler(e): _LOGGER.error(str(e)) return True test_stream_service = TestStreamServiceController( streaming_cache_manager, exception_handler=exception_handler) test_stream_service.start() ie.current_env().set_test_stream_service_controller( user_pipeline, test_stream_service) pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api( pipeline_instrument.instrumented_pipeline_proto(), self._underlying_runner, options) if ie.current_env().get_test_stream_service_controller(user_pipeline): endpoint = ie.current_env().get_test_stream_service_controller( user_pipeline).endpoint # TODO: make the StreamingCacheManager and TestStreamServiceController # constructed when the InteractiveEnvironment is imported. class TestStreamVisitor(PipelineVisitor): def visit_transform(self, transform_node): from apache_beam.testing.test_stream import TestStream if (isinstance(transform_node.transform, TestStream) and not transform_node.transform._events): transform_node.transform._endpoint = endpoint pipeline_to_execute.visit(TestStreamVisitor()) if not self._skip_display: a_pipeline_graph = pipeline_graph.PipelineGraph( pipeline_instrument.original_pipeline_proto, render_option=self._render_option) a_pipeline_graph.display_graph() main_job_result = PipelineResult( pipeline_to_execute.run(), pipeline_instrument) # In addition to this pipeline result setting, redundant result setting from # outer scopes are also recommended since the user_pipeline might not be # available from within this scope. if user_pipeline: ie.current_env().set_pipeline_result(user_pipeline, main_job_result) if self._blocking: main_job_result.wait_until_finish() if main_job_result.state is beam.runners.runner.PipelineState.DONE: # pylint: disable=bad-option-value ie.current_env().mark_pcollection_computed( pipeline_instrument.cached_pcolls) return main_job_result
def _strip_protocol_if_any(self, flink_master: Optional[str]): if flink_master: parts = flink_master.split('://') if len(parts) > 1: return parts[1] return flink_master def _worker_options_to_cluster_metadata( self, options: PipelineOptions, cluster_metadata: ClusterMetadata): worker_options = options.view_as(WorkerOptions) if worker_options.subnetwork: cluster_metadata.subnetwork = worker_options.subnetwork if worker_options.num_workers: cluster_metadata.num_workers = worker_options.num_workers if worker_options.machine_type: cluster_metadata.machine_type = worker_options.machine_type def _configure_flink_options( self, options: PipelineOptions, flink_version: str, master_url: str): flink_options = options.view_as(FlinkRunnerOptions) flink_options.flink_version = flink_version # flink_options.flink_job_server_jar will be populated by the # apache_beam.utils.subprocess_server.JavaJarServer.path_to_beam_jar, # do not populate it explicitly. flink_options.flink_master = master_url
[docs]class PipelineResult(beam.runners.runner.PipelineResult): """Provides access to information about a pipeline.""" def __init__(self, underlying_result, pipeline_instrument): """Constructor of PipelineResult. Args: underlying_result: (PipelineResult) the result returned by the underlying runner running the pipeline. pipeline_instrument: (PipelineInstrument) pipeline instrument describing the pipeline being executed with interactivity applied and related metadata including where the interactivity-backing cache lies. """ super().__init__(underlying_result.state) self._underlying_result = underlying_result self._pipeline_instrument = pipeline_instrument @property def state(self): return self._underlying_result.state
[docs] def wait_until_finish(self): self._underlying_result.wait_until_finish()
[docs] def get(self, pcoll, include_window_info=False): """Materializes the PCollection into a list. If include_window_info is True, then returns the elements as WindowedValues. Otherwise, return the element as itself. """ return list(self.read(pcoll, include_window_info))
[docs] def read(self, pcoll, include_window_info=False): """Reads the PCollection one element at a time from cache. If include_window_info is True, then returns the elements as WindowedValues. Otherwise, return the element as itself. """ key = self._pipeline_instrument.cache_key(pcoll) cache_manager = ie.current_env().get_cache_manager( self._pipeline_instrument.user_pipeline) if key and cache_manager.exists('full', key): coder = cache_manager.load_pcoder('full', key) reader, _ = cache_manager.read('full', key) return to_element_list(reader, coder, include_window_info) else: raise ValueError('PCollection not available, please run the pipeline.')
[docs] def cancel(self): self._underlying_result.cancel()