Source code for apache_beam.dataframe.expressions

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import contextlib
import random
import threading
from typing import Any
from typing import Callable
from typing import Iterable
from typing import Optional
from typing import TypeVar

from apache_beam.dataframe import partitionings


[docs]class Session(object): """A session represents a mapping of expressions to concrete values. The bindings typically include required placeholders, but may be any intermediate expression as well. """ def __init__(self, bindings=None): self._bindings = dict(bindings or {})
[docs] def evaluate(self, expr): # type: (Expression) -> Any if expr not in self._bindings: self._bindings[expr] = expr.evaluate_at(self) return self._bindings[expr]
[docs] def lookup(self, expr): # type: (Expression) -> Any return self._bindings[expr]
[docs]class PartitioningSession(Session): """An extension of Session that enforces actual partitioning of inputs. Each expression is evaluated multiple times for various supported partitionings determined by its `requires_partition_by` specification. For each tested partitioning, the input is partitioned and the expression is evaluated on each partition separately, as if this were actually executed in a parallel manner. For each input partitioning, the results are verified to be partitioned appropriately according to the expression's `preserves_partition_by` specification. For testing only. """
[docs] def evaluate(self, expr): import pandas as pd import collections def is_scalar(expr): return not isinstance(expr.proxy(), pd.core.generic.NDFrame) if expr not in self._bindings: if is_scalar(expr) or not expr.args(): result = super(PartitioningSession, self).evaluate(expr) else: scaler_args = [arg for arg in expr.args() if is_scalar(arg)] def evaluate_with(input_partitioning): parts = collections.defaultdict( lambda: Session({arg: self.evaluate(arg) for arg in scaler_args})) for arg in expr.args(): if not is_scalar(arg): input = self.evaluate(arg) for key, part in input_partitioning.test_partition_fn(input): parts[key]._bindings[arg] = part if not parts: parts[None] # Create at least one entry. results = [] for session in parts.values(): if any(len(session.lookup(arg)) for arg in expr.args() if not is_scalar(arg)): results.append(session.evaluate(expr)) expected_output_partitioning = expr.preserves_partition_by( ) if input_partitioning.is_subpartitioning_of( expr.preserves_partition_by()) else input_partitioning if not expected_output_partitioning.check(results): raise AssertionError( f"""Expression does not preserve partitioning! Expression: {expr} Requires: {expr.requires_partition_by()} Preserves: {expr.preserves_partition_by()} Input partitioning: {input_partitioning} Expected output partitioning: {expected_output_partitioning} """) if results: return pd.concat(results) else: # Choose any single session. return next(iter(parts.values())).evaluate(expr) # Store random state so it can be re-used for each execution, in case # the expression is part of a test that relies on the random seed. random_state = random.getstate() for input_partitioning in set([expr.requires_partition_by(), partitionings.Nothing(), partitionings.Index(), partitionings.Singleton()]): if not input_partitioning.is_subpartitioning_of( expr.requires_partition_by()): continue random.setstate(random_state) # TODO(BEAM-11324): Consider verifying result is always the same result = evaluate_with(input_partitioning) self._bindings[expr] = result return self._bindings[expr]
# The return type of an Expression T = TypeVar('T')
[docs]class Expression(object): """An expression is an operation bound to a set of arguments. An expression represents a deferred tree of operations, which can be evaluated at a specific bindings of root expressions to values. """ def __init__( self, name, # type: str proxy, # type: T _id=None # type: Optional[str] ): self._name = name self._proxy = proxy # Store for preservation through pickling. self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self))
[docs] def proxy(self): # type: () -> T return self._proxy
def __hash__(self): return hash(self._id) def __eq__(self, other): return self._id == other._id def __ne__(self, other): return not self == other def __repr__(self): return '%s[%s]' % (self.__class__.__name__, self._id)
[docs] def placeholders(self): """Returns all the placeholders that self depends on.""" raise NotImplementedError(type(self))
[docs] def evaluate_at(self, session): # type: (Session) -> T """Returns the result of self with the bindings given in session.""" raise NotImplementedError(type(self))
[docs] def requires_partition_by(self): # type: () -> partitionings.Partitioning """Returns the partitioning, if any, require to evaluate this expression. Returns partitioning.Nothing() to require no partitioning is required. """ raise NotImplementedError(type(self))
[docs] def preserves_partition_by(self): # type: () -> partitionings.Partitioning """Returns the partitioning, if any, preserved by this expression. This gives an upper bound on the partitioning of its ouput. The actual partitioning of the output may be less strict (e.g. if the input was less partitioned). """ raise NotImplementedError(type(self))
[docs]class PlaceholderExpression(Expression): """An expression whose value must be explicitly bound in the session.""" def __init__( self, # type: PlaceholderExpression proxy, # type: T reference=None, # type: Any ): """Initialize a placeholder expression. Args: proxy: A proxy object with the type expected to be bound to this expression. Used for type checking at pipeline construction time. """ super(PlaceholderExpression, self).__init__('placeholder', proxy) self._reference = reference
[docs] def placeholders(self): return frozenset([self])
[docs] def args(self): return ()
[docs] def evaluate_at(self, session): return session.lookup(self)
[docs] def requires_partition_by(self): return partitionings.Nothing()
[docs] def preserves_partition_by(self): return partitionings.Nothing()
[docs]class ConstantExpression(Expression): """An expression whose value is known at pipeline construction time.""" def __init__( self, # type: ConstantExpression value, # type: T proxy=None # type: Optional[T] ): """Initialize a constant expression. Args: value: The constant value to be produced by this expression. proxy: (Optional) a proxy object with same type as `value` to use for rapid type checking at pipeline construction time. If not provided, `value` will be used directly. """ if proxy is None: proxy = value super(ConstantExpression, self).__init__('constant', proxy) self._value = value
[docs] def placeholders(self): return frozenset()
[docs] def args(self): return ()
[docs] def evaluate_at(self, session): return self._value
[docs] def requires_partition_by(self): return partitionings.Nothing()
[docs] def preserves_partition_by(self): return partitionings.Nothing()
[docs]class ComputedExpression(Expression): """An expression whose value must be computed at pipeline execution time.""" def __init__( self, # type: ComputedExpression name, # type: str func, # type: Callable[...,T] args, # type: Iterable[Expression] proxy=None, # type: Optional[T] _id=None, # type: Optional[str] requires_partition_by=partitionings.Index(), # type: partitionings.Partitioning preserves_partition_by=partitionings.Nothing(), # type: partitionings.Partitioning ): """Initialize a computed expression. Args: name: The name of this expression. func: The function that will be used to compute the value of this expression. Should accept arguments of the types returned when evaluating the `args` expressions. args: The list of expressions that will be used to produce inputs to `func`. proxy: (Optional) a proxy object with same type as the objects that this ComputedExpression will produce at execution time. If not provided, a proxy will be generated using `func` and the proxies of `args`. _id: (Optional) a string to uniquely identify this expression. requires_partition_by: The required (common) partitioning of the args. preserves_partition_by: The level of partitioning preserved. """ if (not _get_allow_non_parallel() and requires_partition_by == partitionings.Singleton()): raise NonParallelOperation( "Using non-parallel form of %s " "outside of allow_non_parallel_operations block." % name) args = tuple(args) if proxy is None: proxy = func(*(arg.proxy() for arg in args)) super(ComputedExpression, self).__init__(name, proxy, _id) self._func = func self._args = args self._requires_partition_by = requires_partition_by self._preserves_partition_by = preserves_partition_by
[docs] def placeholders(self): return frozenset.union( frozenset(), *[arg.placeholders() for arg in self.args()])
[docs] def args(self): return self._args
[docs] def evaluate_at(self, session): return self._func(*(session.evaluate(arg) for arg in self._args))
[docs] def requires_partition_by(self): return self._requires_partition_by
[docs] def preserves_partition_by(self): return self._preserves_partition_by
[docs]def elementwise_expression(name, func, args): return ComputedExpression( name, func, args, requires_partition_by=partitionings.Nothing(), preserves_partition_by=partitionings.Singleton())
_ALLOW_NON_PARALLEL = threading.local() _ALLOW_NON_PARALLEL.value = False def _get_allow_non_parallel(): return _ALLOW_NON_PARALLEL.value
[docs]@contextlib.contextmanager def allow_non_parallel_operations(allow=True): if allow is None: yield else: old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow yield _ALLOW_NON_PARALLEL.value = old_value
[docs]class NonParallelOperation(Exception): pass