#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module for transforms that reifies and unreifies PCollection values with
window info.
For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
from typing import Optional
import apache_beam as beam
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.testing import test_stream
READ_CACHE = 'ReadCache_'
WRITE_CACHE = 'WriteCache_'
[docs]
class Reify(beam.DoFn):
  """Reifies elements with window info into windowed values.
  Internally used to capture window info with each element into cache for
  replayability.
  """
[docs]
  def process(self, e, wv=beam.DoFn.WindowedValueParam):
    yield test_stream.WindowedValueHolder(wv) 
 
[docs]
class Unreify(beam.DoFn):
  """Unreifies elements from windowed values.
  Cached values are elements with window info. This unpacks the elements.
  """
[docs]
  def process(self, e):
    # Row coder was used when encoding windowed values.
    if isinstance(e, beam.Row) and hasattr(e, 'windowed_value'):
      yield e.windowed_value 
 
[docs]
def reify_to_cache(
    pcoll: beam.pvalue.PCollection,
    cache_key: str,
    cache_manager: cache.CacheManager,
    reify_label: Optional[str] = None,
    write_cache_label: Optional[str] = None,
    is_capture: bool = False) -> beam.pvalue.PValue:
  """Reifies elements into windowed values and write to cache.
  Args:
    pcoll: The PCollection to be cached.
    cache_key: The key of the cache.
    cache_manager: The cache manager to manage the cache.
    reify_label: (optional) A transform label for the Reify transform.
    write_cache_label: (optional) A transform label for the cache-writing
      transform.
    is_capture: Whether the cache is capturing a record of recordable sources.
  """
  if not reify_label:
    reify_label = '{}{}{}'.format('ReifyBefore_', WRITE_CACHE, cache_key)
  if not write_cache_label:
    write_cache_label = '{}{}'.format(WRITE_CACHE, cache_key)
  return (
      pcoll | reify_label >> beam.ParDo(Reify())
      | write_cache_label >> cache.WriteCache(
          cache_manager, cache_key, is_capture=is_capture)) 
[docs]
def unreify_from_cache(
    pipeline: beam.Pipeline,
    cache_key: str,
    cache_manager: cache.CacheManager,
    element_type: Optional[type] = None,
    source_label: Optional[str] = None,
    unreify_label: Optional[str] = None) -> beam.pvalue.PCollection:
  """Reads from cache and unreifies elements from windowed values.
  pipeline: The pipeline that's reading from the cache.
  cache_key: The key of the cache.
  cache_manager: The cache manager to manage the cache.
  element_type: (optional) The element type of the PCollection's elements.
  source_label: (optional) A transform label for the cache-reading transform.
  unreify_label: (optional) A transform label for the Unreify transform.
  """
  if not source_label:
    source_label = '{}{}'.format(READ_CACHE, cache_key)
  if not unreify_label:
    unreify_label = '{}{}{}'.format('UnreifyAfter_', READ_CACHE, cache_key)
  read_cache = pipeline | source_label >> cache.ReadCache(
      cache_manager, cache_key)
  if element_type:
    # If the PCollection is schema-aware, explicitly sets the output types.
    return read_cache | unreify_label >> beam.ParDo(
        Unreify()).with_output_types(element_type)
  return read_cache | unreify_label >> beam.ParDo(Unreify())