Source code for apache_beam.runners.interactive.caching.reify

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for transforms that reifies and unreifies PCollection values with
window info.

For internal use only; no backwards-compatibility guarantees.

# pytype: skip-file

from typing import Optional

import apache_beam as beam
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.testing import test_stream
from apache_beam.transforms.window import WindowedValue

READ_CACHE = 'ReadCache_'
WRITE_CACHE = 'WriteCache_'

[docs]class Reify(beam.DoFn): """Reifies elements with window info into windowed values. Internally used to capture window info with each element into cache for replayability. """
[docs] def process( self, e, w=beam.DoFn.WindowParam, p=beam.DoFn.PaneInfoParam, t=beam.DoFn.TimestampParam): yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
[docs]class Unreify(beam.DoFn): """Unreifies elements from windowed values. Cached values are elements with window info. This unpacks the elements. """
[docs] def process(self, e): # Row coder was used when encoding windowed values. if isinstance(e, beam.Row) and hasattr(e, 'windowed_value'): yield e.windowed_value
[docs]def reify_to_cache( pcoll: beam.pvalue.PCollection, cache_key: str, cache_manager: cache.CacheManager, reify_label: Optional[str] = None, write_cache_label: Optional[str] = None, is_capture: bool = False) -> beam.pvalue.PValue: """Reifies elements into windowed values and write to cache. Args: pcoll: The PCollection to be cached. cache_key: The key of the cache. cache_manager: The cache manager to manage the cache. reify_label: (optional) A transform label for the Reify transform. write_cache_label: (optional) A transform label for the cache-writing transform. is_capture: Whether the cache is capturing a record of recordable sources. """ if not reify_label: reify_label = '{}{}{}'.format('ReifyBefore_', WRITE_CACHE, cache_key) if not write_cache_label: write_cache_label = '{}{}'.format(WRITE_CACHE, cache_key) return ( pcoll | reify_label >> beam.ParDo(Reify()) | write_cache_label >> cache.WriteCache( cache_manager, cache_key, is_capture=is_capture))
[docs]def unreify_from_cache( pipeline: beam.Pipeline, cache_key: str, cache_manager: cache.CacheManager, element_type: Optional[type] = None, source_label: Optional[str] = None, unreify_label: Optional[str] = None) -> beam.pvalue.PCollection: """Reads from cache and unreifies elements from windowed values. pipeline: The pipeline that's reading from the cache. cache_key: The key of the cache. cache_manager: The cache manager to manage the cache. element_type: (optional) The element type of the PCollection's elements. source_label: (optional) A transform label for the cache-reading transform. unreify_label: (optional) A transform label for the Unreify transform. """ if not source_label: source_label = '{}{}'.format(READ_CACHE, cache_key) if not unreify_label: unreify_label = '{}{}{}'.format('UnreifyAfter_', READ_CACHE, cache_key) read_cache = pipeline | source_label >> cache.ReadCache( cache_manager, cache_key) if element_type: # If the PCollection is schema-aware, explicitly sets the output types. return read_cache | unreify_label >> beam.ParDo( Unreify()).with_output_types(element_type) return read_cache | unreify_label >> beam.ParDo(Unreify())