#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Class that tracks derived/pipeline fragments from user pipelines.
For internal use only; no backwards-compatibility guarantees.
In the InteractiveRunner the design is to keep the user pipeline unchanged,
create a copy of the user pipeline, and modify the copy. When the derived
pipeline runs, there should only be per-user pipeline state. This makes sure
that derived pipelines can link back to the parent user pipeline.
"""
from typing import Iterator
from typing import Optional
import apache_beam as beam # type: ignore
[docs]class UserPipelineTracker:
"""Tracks user pipelines from derived pipelines.
This data structure is similar to a disjoint set data structure. A derived
pipeline can only have one parent user pipeline. A user pipeline can have many
derived pipelines.
"""
def __init__(self):
self._user_pipelines: dict[beam.Pipeline, list[beam.Pipeline]] = {}
self._derived_pipelines: dict[beam.Pipeline] = {}
self._pid_to_pipelines: dict[beam.Pipeline] = {}
def __iter__(self) -> Iterator[beam.Pipeline]:
"""Iterates through all the user pipelines."""
for p in self._user_pipelines:
yield p
def _key(self, pipeline: beam.Pipeline) -> str:
return str(id(pipeline))
[docs] def evict(self, pipeline: beam.Pipeline) -> None:
"""Evicts the pipeline.
Removes the given pipeline and derived pipelines if a user pipeline.
Otherwise, removes the given derived pipeline.
"""
user_pipeline = self.get_user_pipeline(pipeline)
if user_pipeline:
for d in self._user_pipelines[user_pipeline]:
del self._derived_pipelines[d]
del self._user_pipelines[user_pipeline]
elif pipeline in self._derived_pipelines:
del self._derived_pipelines[pipeline]
[docs] def clear(self) -> None:
"""Clears the tracker of all user and derived pipelines."""
self._user_pipelines.clear()
self._derived_pipelines.clear()
self._pid_to_pipelines.clear()
[docs] def get_pipeline(self, pid: str) -> Optional[beam.Pipeline]:
"""Returns the pipeline corresponding to the given pipeline id."""
return self._pid_to_pipelines.get(pid, None)
[docs] def add_user_pipeline(self, p: beam.Pipeline) -> beam.Pipeline:
"""Adds a user pipeline with an empty set of derived pipelines."""
self._memoize_pipieline(p)
# Create a new node for the user pipeline if it doesn't exist already.
user_pipeline = self.get_user_pipeline(p)
if not user_pipeline:
user_pipeline = p
self._user_pipelines[p] = []
return user_pipeline
def _memoize_pipieline(self, p: beam.Pipeline) -> None:
"""Memoizes the pid of the pipeline to the pipeline object."""
pid = self._key(p)
if pid not in self._pid_to_pipelines:
self._pid_to_pipelines[pid] = p
[docs] def add_derived_pipeline(
self, maybe_user_pipeline: beam.Pipeline,
derived_pipeline: beam.Pipeline) -> None:
"""Adds a derived pipeline with the user pipeline.
If the `maybe_user_pipeline` is a user pipeline, then the derived pipeline
will be added to its set. Otherwise, the derived pipeline will be added to
the user pipeline of the `maybe_user_pipeline`.
By doing the above one can do:
p = beam.Pipeline()
derived1 = beam.Pipeline()
derived2 = beam.Pipeline()
ut = UserPipelineTracker()
ut.add_derived_pipeline(p, derived1)
ut.add_derived_pipeline(derived1, derived2)
# Returns p.
ut.get_user_pipeline(derived2)
"""
self._memoize_pipieline(maybe_user_pipeline)
self._memoize_pipieline(derived_pipeline)
# Cannot add a derived pipeline twice.
assert derived_pipeline not in self._derived_pipelines
# Get the "true" user pipeline. This allows for the user to derive a
# pipeline from another derived pipeline, use both as arguments, and this
# method will still get the correct user pipeline.
user = self.add_user_pipeline(maybe_user_pipeline)
# Map the derived pipeline to the user pipeline.
self._derived_pipelines[derived_pipeline] = user
self._user_pipelines[user].append(derived_pipeline)
[docs] def get_user_pipeline(self, p: beam.Pipeline) -> Optional[beam.Pipeline]:
"""Returns the user pipeline of the given pipeline.
If the given pipeline has no user pipeline, i.e. not added to this tracker,
then this returns None. If the given pipeline is a user pipeline then this
returns the same pipeline. If the given pipeline is a derived pipeline then
this returns the user pipeline.
"""
# If `p` is a user pipeline then return it.
if p in self._user_pipelines:
return p
# If `p` exists then return its user pipeline.
if p in self._derived_pipelines:
return self._derived_pipelines[p]
# Otherwise, `p` is not in this tracker.
return None