#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module to build and run background source recording jobs.
For internal use only; no backwards-compatibility guarantees.
A background source recording job is a job that records events for all
recordable sources of a given pipeline. With Interactive Beam, one such job is
started when a pipeline run happens (which produces a main job in contrast to
the background source recording job) and meets the following conditions:
#. The pipeline contains recordable sources, configured through
interactive_beam.options.recordable_sources.
#. No such background job is running.
#. No such background job has completed successfully and the cached events are
still valid (invalidated when recordable sources change in the pipeline).
Once started, the background source recording job runs asynchronously until it
hits some recording limit configured in interactive_beam.options. Meanwhile,
the main job and future main jobs from the pipeline will run using the
deterministic replayable recorded events until they are invalidated.
"""
# pytype: skip-file
import logging
import threading
import time
import apache_beam as beam
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
from apache_beam.runners.interactive.caching import streaming_cache
from apache_beam.runners.runner import PipelineState
_LOGGER = logging.getLogger(__name__)
[docs]
class BackgroundCachingJob(object):
"""A simple abstraction that controls necessary components of a timed and
space limited background source recording job.
A background source recording job successfully completes source data
recording in 2 conditions:
#. The job is finite and runs into DONE state;
#. The job is infinite but hits an interactive_beam.options configured limit
and gets cancelled into CANCELLED/CANCELLING state.
In both situations, the background source recording job should be treated as
done successfully.
"""
def __init__(self, pipeline_result, limiters):
self._pipeline_result = pipeline_result
self._result_lock = threading.RLock()
self._condition_checker = threading.Thread(
target=self._background_caching_job_condition_checker, daemon=True)
# Limiters are checks s.t. if any are triggered then the background caching
# job gets cancelled.
self._limiters = limiters
self._condition_checker.start()
def _background_caching_job_condition_checker(self):
while True:
with self._result_lock:
if PipelineState.is_terminal(self._pipeline_result.state):
break
if self._should_end_condition_checker():
self.cancel()
break
time.sleep(0.5)
def _should_end_condition_checker(self):
return any(l.is_triggered() for l in self._limiters)
[docs]
def is_done(self):
with self._result_lock:
is_terminated = self._pipeline_result.state in (
PipelineState.DONE, PipelineState.CANCELLED)
is_triggered = self._should_end_condition_checker()
is_cancelling = self._pipeline_result.state is PipelineState.CANCELLING
return is_terminated or (is_triggered and is_cancelling)
[docs]
def is_running(self):
with self._result_lock:
return self._pipeline_result.state is PipelineState.RUNNING
[docs]
def cancel(self):
"""Cancels this background source recording job.
"""
with self._result_lock:
if not PipelineState.is_terminal(self._pipeline_result.state):
try:
self._pipeline_result.cancel()
except NotImplementedError:
# Ignore the cancel invocation if it is never implemented by the
# runner.
pass
@property
def state(self):
with self._result_lock:
return self._pipeline_result.state
[docs]
def attempt_to_run_background_caching_job(
runner, user_pipeline, options=None, limiters=None):
"""Attempts to run a background source recording job for a user-defined
pipeline.
Returns True if a job was started, False otherwise.
The pipeline result is automatically tracked by Interactive Beam in case
future cancellation/cleanup is needed.
"""
if is_background_caching_job_needed(user_pipeline):
# Cancel non-terminal jobs if there is any before starting a new one.
attempt_to_cancel_background_caching_job(user_pipeline)
# Cancel the gRPC server serving the test stream if there is one.
attempt_to_stop_test_stream_service(user_pipeline)
# TODO(BEAM-8335): refactor background source recording job logic from
# pipeline_instrument module to this module and aggregate tests.
from apache_beam.runners.interactive import pipeline_instrument as instr
runner_pipeline = beam.pipeline.Pipeline.from_runner_api(
user_pipeline.to_runner_api(), runner, options)
ie.current_env().add_derived_pipeline(user_pipeline, runner_pipeline)
background_caching_job_result = beam.pipeline.Pipeline.from_runner_api(
instr.build_pipeline_instrument(
runner_pipeline).background_caching_pipeline_proto(),
runner,
options).run()
recording_limiters = (
limiters
if limiters else ie.current_env().options.capture_control.limiters())
ie.current_env().set_background_caching_job(
user_pipeline,
BackgroundCachingJob(
background_caching_job_result, limiters=recording_limiters))
return True
return False
[docs]
def is_background_caching_job_needed(user_pipeline):
"""Determines if a background source recording job needs to be started.
It does several state checks and recording state changes throughout the
process. It is not idempotent to simplify the usage.
"""
job = ie.current_env().get_background_caching_job(user_pipeline)
# Checks if the pipeline contains any source that needs to be cached.
need_cache = has_source_to_cache(user_pipeline)
# If this is True, we can invalidate a previous done/running job if there is
# one.
cache_changed = is_source_to_cache_changed(user_pipeline)
# When recording replay is disabled, cache is always needed for recordable
# sources (if any).
if need_cache and not ie.current_env().options.enable_recording_replay:
from apache_beam.runners.interactive.options import capture_control
capture_control.evict_captured_data()
return True
return (
need_cache and
# Checks if it's the first time running a job from the pipeline.
(
not job or
# Or checks if there is no previous job.
# DONE means a previous job has completed successfully and the
# cached events might still be valid.
not (
job.is_done() or
# RUNNING means a previous job has been started and is still
# running.
job.is_running()) or
# Or checks if we can invalidate the previous job.
cache_changed))
[docs]
def is_cache_complete(pipeline_id: str) -> bool:
"""Returns True if the backgrond cache for the given pipeline is done.
"""
user_pipeline = ie.current_env().pipeline_id_to_pipeline(pipeline_id)
job = ie.current_env().get_background_caching_job(user_pipeline)
is_done = job and job.is_done()
cache_changed = is_source_to_cache_changed(
user_pipeline, update_cached_source_signature=False)
# Stop reading from the cache if the background job is done or the underlying
# cache signature changed that requires a new background source recording job.
return is_done or cache_changed
[docs]
def has_source_to_cache(user_pipeline):
"""Determines if a user-defined pipeline contains any source that need to be
cached. If so, also immediately wrap current cache manager held by current
interactive environment into a streaming cache if this has not been done.
The wrapping doesn't invalidate existing cache in any way.
This can help determining if a background source recording job is needed to
write cache for sources and if a test stream service is needed to serve the
cache.
Throughout the check, if source-to-cache has changed from the last check, it
also cleans up the invalidated cache early on.
"""
# TODO(BEAM-8335): we temporarily only cache replaceable unbounded sources.
# Add logic for other cacheable sources here when they are available.
has_cache = utils.has_unbounded_sources(user_pipeline)
if has_cache:
if not isinstance(ie.current_env().get_cache_manager(user_pipeline,
create_if_absent=True),
streaming_cache.StreamingCache):
file_based_cm = ie.current_env().get_cache_manager(user_pipeline)
cache_dir = file_based_cm._cache_dir
cache_root = ie.current_env().options.cache_root
if cache_root:
if cache_root.startswith('gs://'):
raise ValueError(
'GCS cache paths are not currently supported for '
'streaming pipelines.')
cache_dir = cache_root
ie.current_env().set_cache_manager(
streaming_cache.StreamingCache(
cache_dir,
is_cache_complete=is_cache_complete,
sample_resolution_sec=1.0,
saved_pcoders=file_based_cm._saved_pcoders),
user_pipeline)
return has_cache
[docs]
def attempt_to_cancel_background_caching_job(user_pipeline):
"""Attempts to cancel background source recording job for a user-defined
pipeline.
If no background source recording job needs to be cancelled, NOOP. Otherwise,
cancel such job.
"""
job = ie.current_env().get_background_caching_job(user_pipeline)
if job:
job.cancel()
[docs]
def attempt_to_stop_test_stream_service(user_pipeline):
"""Attempts to stop the gRPC server/service serving the test stream.
If there is no such server started, NOOP. Otherwise, stop it.
"""
if is_a_test_stream_service_running(user_pipeline):
ie.current_env().evict_test_stream_service_controller(user_pipeline).stop()
[docs]
def is_a_test_stream_service_running(user_pipeline):
"""Checks to see if there is a gPRC server/service running that serves the
test stream to any job started from the given user_pipeline.
"""
return ie.current_env().get_test_stream_service_controller(
user_pipeline) is not None
[docs]
def is_source_to_cache_changed(
user_pipeline, update_cached_source_signature=True):
"""Determines if there is any change in the sources that need to be cached
used by the user-defined pipeline.
Due to the expensiveness of computations and for the simplicity of usage, this
function is not idempotent because Interactive Beam automatically discards
previously tracked signature of transforms and tracks the current signature of
transforms for the user-defined pipeline if there is any change.
When it's True, there is addition/deletion/mutation of source transforms that
requires a new background source recording job.
"""
# By default gets empty set if the user_pipeline is first time seen because
# we can treat it as adding transforms.
recorded_signature = ie.current_env().get_cached_source_signature(
user_pipeline)
current_signature = extract_source_to_cache_signature(user_pipeline)
is_changed = not current_signature.issubset(recorded_signature)
# The computation of extract_unbounded_source_signature is expensive, track on
# change by default.
if is_changed and update_cached_source_signature:
options = ie.current_env().options
# No info needed when recording replay is disabled.
if options.enable_recording_replay:
if not recorded_signature:
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(num) < 1000.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1000.0
return "%.1f%s%s" % (num, 'Yi', suffix)
_LOGGER.info(
'Interactive Beam has detected unbounded sources in your pipeline. '
'In order to have a deterministic replay, a segment of data will '
'be recorded from all sources for %s seconds or until a total of '
'%s have been written to disk.',
options.recording_duration.total_seconds(),
sizeof_fmt(options.recording_size_limit))
else:
_LOGGER.info(
'Interactive Beam has detected a new streaming source was '
'added to the pipeline. In order for the cached streaming '
'data to start at the same time, all recorded data has been '
'cleared and a new segment of data will be recorded.')
ie.current_env().cleanup(user_pipeline)
ie.current_env().set_cached_source_signature(
user_pipeline, current_signature)
ie.current_env().add_user_pipeline(user_pipeline)
return is_changed