Source code for apache_beam.runners.interactive.caching.streaming_cache

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

from __future__ import absolute_import

from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.utils import timestamp


[docs]class StreamingCache(object): """Abstraction that holds the logic for reading and writing to cache. """ def __init__(self, readers): self._readers = readers
[docs] class Reader(object): """Abstraction that reads from PCollection readers. This class is an Abstraction layer over multiple PCollection readers to be used for supplying a TestStream service with events. This class is also responsible for holding the state of the clock, injecting clock advancement events, and watermark advancement events. """ def __init__(self, readers): # This timestamp is used as the monotonic clock to order events in the # replay. self._monotonic_clock = timestamp.Timestamp.of(0) # The PCollection cache readers. self._readers = {} # The file headers that are metadata for that particular PCollection. # The header allows for metadata about an entire stream, so that the data # isn't copied per record. self._headers = {r.header().tag: r.header() for r in readers} self._readers = {r.header().tag: r.read() for r in readers} # The most recently read timestamp per tag. self._stream_times = { tag: timestamp.Timestamp(seconds=0) for tag in self._headers } def _test_stream_events_before_target(self, target_timestamp): """Reads the next iteration of elements from each stream. Retrieves an element from each stream iff the most recently read timestamp from that stream is less than the target_timestamp. Since the amount of events may not fit into memory, this StreamingCache reads at most one element from each stream at a time. """ records = [] for tag, r in self._readers.items(): # The target_timestamp is the maximum timestamp that was read from the # stream. Some readers may have elements that are less than this. Thus, # we skip all readers that already have elements that are at this # timestamp so that we don't read everything into memory. if self._stream_times[tag] >= target_timestamp: continue try: record = next(r).recorded_event if record.HasField('processing_time_event'): self._stream_times[tag] += timestamp.Duration( micros=record.processing_time_event.advance_duration) records.append((tag, record, self._stream_times[tag])) except StopIteration: pass return records def _merge_sort(self, previous_events, new_events): return sorted( previous_events + new_events, key=lambda x: x[2], reverse=True) def _min_timestamp_of(self, events): return events[-1][2] if events else timestamp.MAX_TIMESTAMP def _event_stream_caught_up_to_target(self, events, target_timestamp): empty_events = not events stream_is_past_target = self._min_timestamp_of(events) > target_timestamp return empty_events or stream_is_past_target
[docs] def read(self): """Reads records from PCollection readers. """ # The largest timestamp read from the different streams. target_timestamp = timestamp.MAX_TIMESTAMP # The events from last iteration that are past the target timestamp. unsent_events = [] # Emit events until all events have been read. while True: # Read the next set of events. The read events will most likely be # out of order if there are multiple readers. Here we sort them into # a more manageable state. new_events = self._test_stream_events_before_target(target_timestamp) events_to_send = self._merge_sort(unsent_events, new_events) if not events_to_send: break # Get the next largest timestamp in the stream. This is used as the # timestamp for readers to "catch-up" to. This will only read from # readers with a timestamp less than this. target_timestamp = self._min_timestamp_of(events_to_send) # Loop through the elements with the correct timestamp. while not self._event_stream_caught_up_to_target(events_to_send, target_timestamp): # First advance the clock to match the time of the stream. This has # a side-effect of also advancing this cache's clock. tag, r, curr_timestamp = events_to_send.pop() if curr_timestamp > self._monotonic_clock: yield self._advance_processing_time(curr_timestamp) # Then, send either a new element or watermark. if r.HasField('element_event'): r.element_event.tag = tag yield r elif r.HasField('watermark_event'): r.watermark_event.tag = tag yield r unsent_events = events_to_send target_timestamp = self._min_timestamp_of(unsent_events)
def _add_element(self, element, tag): """Constructs an AddElement event for the specified element and tag. """ return TestStreamPayload.Event( element_event=TestStreamPayload.Event.AddElements( elements=[element], tag=tag)) def _advance_processing_time(self, new_timestamp): """Advances the internal clock and returns an AdvanceProcessingTime event. """ advancy_by = new_timestamp.micros - self._monotonic_clock.micros e = TestStreamPayload.Event( processing_time_event=TestStreamPayload.Event.AdvanceProcessingTime( advance_duration=advancy_by)) self._monotonic_clock = new_timestamp return e
[docs] def reader(self): return StreamingCache.Reader(self._readers)