Source code for apache_beam.runners.interactive.options.capture_control

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Module to control how Interactive Beam captures data from sources for
deterministic replayable PCollection evaluation and pipeline runs.

For internal use only; no backwards-compatibility guarantees.
"""

# pytype: skip-file

from __future__ import absolute_import

import logging
from datetime import timedelta

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.options import capture_limiters

_LOGGER = logging.getLogger(__name__)


[docs]class CaptureControl(object): """Options and their utilities that controls how Interactive Beam captures deterministic replayable data from sources.""" def __init__(self): self._enable_capture_replay = True self._capturable_sources = { ReadFromPubSub, } # yapf: disable self._capture_duration = timedelta(seconds=60) self._capture_size_limit = 1e9 self._test_limiters = None
[docs] def limiters(self): # type: () -> List[capture_limiters.Limiter] if self._test_limiters: return self._test_limiters return [ capture_limiters.SizeLimiter(self._capture_size_limit), capture_limiters.DurationLimiter(self._capture_duration) ]
[docs] def set_limiters_for_test(self, limiters): # type: (List[capture_limiters.Limiter]) -> None self._test_limiters = limiters
[docs]def evict_captured_data(): """Evicts all deterministic replayable data that have been captured by Interactive Beam. In future PCollection evaluation/visualization and pipeline runs, Interactive Beam will capture fresh data.""" if ie.current_env().options.enable_capture_replay: _LOGGER.info( 'You have requested Interactive Beam to evict all captured ' 'data that could be deterministically replayed among multiple ' 'pipeline runs.') ie.current_env().track_user_pipelines() for user_pipeline in ie.current_env().tracked_user_pipelines: bcj.attempt_to_cancel_background_caching_job(user_pipeline) bcj.attempt_to_stop_test_stream_service(user_pipeline) ie.current_env().cleanup()