Source code for apache_beam.runners.interactive.options.capture_control

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Module to control how Interactive Beam captures data from sources for
deterministic replayable PCollection evaluation and pipeline runs.

For internal use only; no backwards-compatibility guarantees.
"""

# pytype: skip-file

from __future__ import absolute_import

import logging
from datetime import timedelta

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_environment as ie

_LOGGER = logging.getLogger(__name__)


[docs]class CaptureControl(object): """Options and their utilities that controls how Interactive Beam captures deterministic replayable data from sources.""" def __init__(self): self._enable_capture_replay = True self._capturable_sources = { ReadFromPubSub, } self._capture_duration = timedelta(seconds=5) self._capture_size = 1e9 def __repr__(self): # TODO(BEAM-8335): add capture_size in the format once it's supported. return ( 'A segment of data will be recorded for {} seconds, for all {} typed' ' sources in the pipeline.'.format( self._capture_duration.total_seconds(), self._capturable_sources))
[docs] def is_capture_size_reached(self): """Determines if the capture size has been reached.""" cache_manager = ie.current_env().cache_manager() # TODO(BEAM-8335): implement the capture_size attribute when streaming_cache # implements cache_manager. if hasattr(cache_manager, 'capture_size'): return cache_manager.capture_size >= self._capture_size return False
[docs]def evict_captured_data(): """Evicts all deterministic replayable data that have been captured by Interactive Beam. In future PCollection evaluation/visualization and pipeline runs, Interactive Beam will capture fresh data.""" if ie.current_env().options.enable_capture_replay: # TODO(BEAM-8335): display rather than logging when is_in_notebook. _LOGGER.info( 'You have requested Interactive Beam to evict all captured ' 'data that could be deterministically replayed among multiple ' 'pipeline runs.') ie.current_env().track_user_pipelines() for user_pipeline in ie.current_env().tracked_user_pipelines: bcj.attempt_to_cancel_background_caching_job(user_pipeline) bcj.attempt_to_stop_test_stream_service(user_pipeline) ie.current_env().cleanup()