Source code for apache_beam.dataframe.frame_base

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import operator
import re
from collections.abc import Callable
from inspect import cleandoc
from inspect import getfullargspec
from inspect import isclass
from inspect import ismodule
from inspect import unwrap
from typing import Any
from typing import Optional
from typing import Tuple
from typing import Union

import pandas as pd

from apache_beam.dataframe import expressions
from apache_beam.dataframe import partitionings


[docs] class DeferredBase(object): _pandas_type_map: dict[Union[type, None], type] = {} def __init__(self, expr): self._expr = expr @classmethod def _register_for(cls, pandas_type): def wrapper(deferred_type): cls._pandas_type_map[pandas_type] = deferred_type return deferred_type return wrapper
[docs] @classmethod def wrap(cls, expr, split_tuples=True): proxy_type = type(expr.proxy()) if proxy_type is tuple and split_tuples: def get(ix): return expressions.ComputedExpression( # yapf: disable 'get_%d' % ix, lambda t: t[ix], [expr], requires_partition_by=partitionings.Arbitrary(), preserves_partition_by=partitionings.Singleton()) return tuple(cls.wrap(get(ix)) for ix in range(len(expr.proxy()))) elif proxy_type in cls._pandas_type_map: wrapper_type = cls._pandas_type_map[proxy_type] else: if expr.requires_partition_by() != partitionings.Singleton(): raise ValueError( 'Scalar expression %s of type %s partitoned by non-singleton %s' % (expr, proxy_type, expr.requires_partition_by())) wrapper_type = _DeferredScalar return wrapper_type(expr)
def _elementwise( self, func, name=None, other_args=(), other_kwargs=None, inplace=False): other_kwargs = other_kwargs or {} return _elementwise_function( func, name, inplace=inplace)(self, *other_args, **other_kwargs) def __reduce__(self): return UnusableUnpickledDeferredBase, (str(self), )
[docs] class UnusableUnpickledDeferredBase(object): """Placeholder object used to break the transitive pickling chain in case a DeferredBase accidentially gets pickled (e.g. as part of globals). Trying to use this object after unpickling is a bug and will result in an error. """ def __init__(self, name): self._name = name def __repr__(self): return 'UnusablePickledDeferredBase(%r)' % self.name
[docs] class DeferredFrame(DeferredBase): pass
class _DeferredScalar(DeferredBase): def apply(self, func, name=None, args=()): if name is None: name = func.__name__ with expressions.allow_non_parallel_operations( all(isinstance(arg, _DeferredScalar) for arg in args) or None): return DeferredFrame.wrap( expressions.ComputedExpression( name, func, [self._expr] + [arg._expr for arg in args], requires_partition_by=partitionings.Singleton())) def __neg__(self): return self.apply(operator.neg) def __pos__(self): return self.apply(operator.pos) def __invert__(self): return self.apply(operator.invert) def __repr__(self): return f"DeferredScalar[type={type(self._expr.proxy())}]" def __bool__(self): # TODO(BEAM-11951): Link to documentation raise TypeError( "Testing the truth value of a deferred scalar is not " "allowed. It's not possible to branch on the result of " "deferred operations.") def _scalar_binop(op): def binop(self, other): if not isinstance(other, DeferredBase): return self.apply(lambda left: getattr(left, op)(other), name=op) elif isinstance(other, _DeferredScalar): return self.apply( lambda left, right: getattr(left, op)(right), name=op, args=[other]) else: return NotImplemented return binop for op in ['__add__', '__sub__', '__mul__', '__div__', '__truediv__', '__floordiv__', '__mod__', '__divmod__', '__pow__', '__and__', '__or__']: setattr(_DeferredScalar, op, _scalar_binop(op)) DeferredBase._pandas_type_map[None] = _DeferredScalar
[docs] def name_and_func(method: Union[str, Callable]) -> Tuple[str, Callable]: """For the given method name or method, return the method name and the method itself. For internal use only. No backwards compatibility guarantees.""" if isinstance(method, str): method_str = method func = lambda df, *args, **kwargs: getattr(df, method_str)(*args, **kwargs) return method, func else: return method.__name__, method
def _elementwise_method( func, name=None, restrictions=None, inplace=False, base=None): return _proxy_method( func, name, restrictions, inplace, base, requires_partition_by=partitionings.Arbitrary(), preserves_partition_by=partitionings.Arbitrary()) def _proxy_method( func, name=None, restrictions=None, inplace=False, base=None, *, requires_partition_by: partitionings.Partitioning, preserves_partition_by: partitionings.Partitioning, ): if name is None: name, func = name_and_func(func) if base is None: raise ValueError("base is required for _proxy_method") return _proxy_function( func, name, restrictions, inplace, base, requires_partition_by=requires_partition_by, preserves_partition_by=preserves_partition_by) def _elementwise_function( func, name=None, restrictions=None, inplace=False, base=None): return _proxy_function( func, name, restrictions, inplace, base, requires_partition_by=partitionings.Arbitrary(), preserves_partition_by=partitionings.Arbitrary()) def _proxy_function( func: Union[Callable, str], name: Optional[str] = None, restrictions: Optional[dict[str, Union[Any, list[Any]]]] = None, inplace: bool = False, base: Optional[type] = None, *, requires_partition_by: partitionings.Partitioning, preserves_partition_by: partitionings.Partitioning, ): if name is None: if isinstance(func, str): name = func else: name = func.__name__ if restrictions is None: restrictions = {} def wrapper(*args, **kwargs): for key, values in restrictions.items(): if key in kwargs: value = kwargs[key] else: try: ix = getfullargspec(func).args.index(key) except ValueError: # TODO: fix for delegation? continue if len(args) <= ix: continue value = args[ix] if callable(values): check = values elif isinstance(values, list): check = lambda x, values=values: x in values else: check = lambda x, value=value: x == value if not check(value): raise NotImplementedError( '%s=%s not supported for %s' % (key, value, name)) deferred_arg_indices = [] deferred_arg_exprs = [] constant_args = [None] * len(args) from apache_beam.dataframe.frames import _DeferredIndex for ix, arg in enumerate(args): if isinstance(arg, DeferredBase): deferred_arg_indices.append(ix) deferred_arg_exprs.append(arg._expr) elif isinstance(arg, _DeferredIndex): # TODO(robertwb): Consider letting indices pass through as indices. # This would require updating the partitioning code, as indices don't # have indices. deferred_arg_indices.append(ix) deferred_arg_exprs.append( expressions.ComputedExpression( 'index_as_series', lambda ix: ix.index.to_series(), # yapf break [arg._frame._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=partitionings.Arbitrary())) elif isinstance(arg, pd.core.generic.NDFrame): deferred_arg_indices.append(ix) deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0])) else: constant_args[ix] = arg deferred_kwarg_keys = [] deferred_kwarg_exprs = [] constant_kwargs = {key: None for key in kwargs} for key, arg in kwargs.items(): if isinstance(arg, DeferredBase): deferred_kwarg_keys.append(key) deferred_kwarg_exprs.append(arg._expr) elif isinstance(arg, pd.core.generic.NDFrame): deferred_kwarg_keys.append(key) deferred_kwarg_exprs.append( expressions.ConstantExpression(arg, arg[0:0])) else: constant_kwargs[key] = arg deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs if inplace: actual_func = _copy_and_mutate(func) else: actual_func = func def apply(*actual_args): actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)], actual_args[len(deferred_arg_exprs):]) full_args = list(constant_args) for ix, arg in zip(deferred_arg_indices, actual_args): full_args[ix] = arg full_kwargs = dict(constant_kwargs) for key, arg in zip(deferred_kwarg_keys, actual_kwargs): full_kwargs[key] = arg return actual_func(*full_args, **full_kwargs) if (requires_partition_by.is_subpartitioning_of(partitionings.Index()) and sum(isinstance(arg.proxy(), pd.core.generic.NDFrame) for arg in deferred_exprs) > 1): # Implicit join on index if there is more than one indexed input. actual_requires_partition_by = partitionings.JoinIndex() else: actual_requires_partition_by = requires_partition_by result_expr = expressions.ComputedExpression( name, apply, deferred_exprs, requires_partition_by=actual_requires_partition_by, preserves_partition_by=preserves_partition_by) if inplace: args[0]._expr = result_expr else: return DeferredFrame.wrap(result_expr) wrapper.__name__ = name if restrictions: wrapper.__doc__ = "\n".join( f"Only {kw}={value!r} is supported" for (kw, value) in restrictions.items()) if base is not None: return with_docs_from(base)(wrapper) else: return wrapper def _prettify_pandas_type(pandas_type): if pandas_type in (pd.DataFrame, pd.Series): return f'pandas.{pandas_type.__name__}' elif isclass(pandas_type): return f'{pandas_type.__module__}.{pandas_type.__name__}' elif ismodule(pandas_type): return pandas_type.__name__ else: raise TypeError(pandas_type)
[docs] def wont_implement_method(base_type, name, reason=None, explanation=None): """Generate a stub method that raises WontImplementError. Note either reason or explanation must be specified. If both are specified, explanation is ignored. Args: base_type: The pandas type of the method that this is trying to replicate. name: The name of the method that this is aiming to replicate. reason: If specified, use data from the corresponding entry in ``_WONT_IMPLEMENT_REASONS`` to generate a helpful exception message and docstring for the method. explanation: If specified, use this string as an explanation for why this operation is not supported when generating an exception message and docstring. """ if reason is not None: if reason not in _WONT_IMPLEMENT_REASONS: raise AssertionError( f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, " f"got {reason!r}") reason_data = _WONT_IMPLEMENT_REASONS[reason] elif explanation is not None: reason_data = {'explanation': explanation} else: raise ValueError("One of (reason, explanation) must be specified") def wrapper(*args, **kwargs): raise WontImplementError( f"'{name}' is not yet supported {reason_data['explanation']}", reason=reason) wrapper.__name__ = name wrapper.__doc__ = ( f":meth:`{_prettify_pandas_type(base_type)}.{name}` is not yet supported " f"in the Beam DataFrame API {reason_data['explanation']}") if 'url' in reason_data: wrapper.__doc__ += f"\n\n For more information see {reason_data['url']}." return wrapper
[docs] def not_implemented_method(op, issue='20318', base_type=None): """Generate a stub method for ``op`` that simply raises a NotImplementedError. For internal use only. No backwards compatibility guarantees.""" assert base_type is not None, "base_type must be specified" issue_url = f"https://issues.apache.org/jira/{issue}." if issue.startswith( "BEAM-") else f"https://github.com/apache/beam/issues/{issue}" def wrapper(*args, **kwargs): raise NotImplementedError( f"{op!r} is not implemented yet. " f"If support for {op!r} is important to you, please let the Beam " "community know by writing to user@beam.apache.org " "(see https://beam.apache.org/community/contact-us/) or commenting on " f"{issue_url}") wrapper.__name__ = op wrapper.__doc__ = ( f":meth:`{_prettify_pandas_type(base_type)}.{op}` is not implemented yet " "in the Beam DataFrame API.\n\n" f"If support for {op!r} is important to you, please let the Beam " "community know by `writing to user@beam.apache.org " "<https://beam.apache.org/community/contact-us/>`_ or commenting on " f"`{issue} <{issue_url}>`_.") return wrapper
def _copy_and_mutate(func): def wrapper(self, *args, **kwargs): copy = self.copy() func(copy, *args, **kwargs) return copy return wrapper
[docs] def maybe_inplace(func): """Handles the inplace= kwarg available in many pandas operations. This decorator produces a new function handles the inplace kwarg. When `inplace=False`, the new function simply yields the result of `func` directly. When `inplace=True`, the output of `func` is used to replace this instances expression. The result is that any operations applied to this instance after the inplace operation will refernce the updated expression. For internal use only. No backwards compatibility guarantees.""" @functools.wraps(func) def wrapper(self, inplace=False, **kwargs): result = func(self, **kwargs) if inplace: self._expr = result._expr else: return result return wrapper
[docs] def args_to_kwargs(base_type, removed_method=False, removed_args=None): """Convert all args to kwargs before calling the decorated function. When applied to a function, this decorator creates a new function that always calls the wrapped function with *only* keyword arguments. It inspects the argspec for the identically-named method on `base_type` to determine the name to use for arguments that are converted to keyword arguments. For internal use only. No backwards compatibility guarantees. Args: base_type: The pandas type of the method that this is trying to replicate. removed_method: Whether this method has been removed in the running Pandas version. removed_args: If not empty, which arguments have been dropped in the running Pandas version. """ def wrap(func): if removed_method: # Do no processing, let Beam function itself raise the error if called. return func removed_arg_names = removed_args if removed_args is not None else [] # We would need to add position only arguments if they ever become a thing # in Pandas (as of 2.1 currently they aren't). base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__))) base_arg_names = base_arg_spec.args # Some arguments are keyword only and we still want to check against those. all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs beam_arg_names = getfullargspec(func).args if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) - set(removed_arg_names)): raise TypeError( f"Beam definition of {func.__name__} has arguments that are not found" f" in the base version of the function: {not_found}") @functools.wraps(func) def wrapper(*args, **kwargs): if len(args) > len(base_arg_names): raise TypeError(f"{func.__name__} got too many positioned arguments.") for name, value in zip(base_arg_names, args): if name in kwargs: raise TypeError( "%s() got multiple values for argument '%s'" % (func.__name__, name)) kwargs[name] = value # Still have to populate these for the Beam function signature. if removed_args: for name in removed_args: if name not in kwargs: kwargs[name] = None return func(**kwargs) return wrapper return wrap
BEAM_SPECIFIC = "Differences from pandas" SECTION_ORDER = [ 'Parameters', 'Returns', 'Raises', BEAM_SPECIFIC, 'See Also', 'Notes', 'Examples' ] EXAMPLES_DISCLAIMER = ( "**NOTE:** These examples are pulled directly from the pandas " "documentation for convenience. Usage of the Beam DataFrame API will look " "different because it is a deferred API.") EXAMPLES_DIFFERENCES = EXAMPLES_DISCLAIMER + ( " In addition, some arguments shown here may not be supported, see " f"**{BEAM_SPECIFIC!r}** for details.")
[docs] def with_docs_from(base_type, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. Any docstring on the original function will be included in the new function under a "Differences from pandas" heading. removed_method used in cases where a method has been removed in a later version of Pandas. """ def wrap(func): if removed_method: func.__doc__ = ( "This method has been removed in the current version of Pandas.") return func fn_name = name or func.__name__ orig_doc = getattr(base_type, fn_name).__doc__ if orig_doc is None: return func orig_doc = cleandoc(orig_doc) section_splits = re.split(r'^(.*)$\n^-+$\n', orig_doc, flags=re.MULTILINE) intro = section_splits[0].strip() sections = dict(zip(section_splits[1::2], section_splits[2::2])) beam_has_differences = bool(func.__doc__) for header, content in sections.items(): content = content.strip() # Replace references to version numbers so its clear they reference # *pandas* versions content = re.sub(r'([Vv]ersion\s+[\d\.]+)', r'pandas \1', content) if header == "Examples": content = '\n\n'.join([ ( EXAMPLES_DIFFERENCES if beam_has_differences else EXAMPLES_DISCLAIMER), # Indent the examples under a doctest heading, # add skipif option. This makes sure our doctest # framework doesn't run these pandas tests. (".. doctest::\n" " :skipif: True"), re.sub(r"^", " ", content, flags=re.MULTILINE), ]) elif "Examples" in content and ">>>" in content: # some new examples don't have the correct heading # this catches those examples split_content = content.split("Examples") content = '\n\n'.join([ split_content[0], "Examples\n", # Indent the code snippet under a doctest heading, # add skipif option. This makes sure our doctest # framework doesn't run these pandas tests. (".. doctest::\n" " :skipif: True"), re.sub(r"^", " ", content, flags=re.MULTILINE), split_content[1] ]) else: content = content.replace('DataFrame', 'DeferredDataFrame').replace( 'Series', 'DeferredSeries') sections[header] = content if beam_has_differences: sections[BEAM_SPECIFIC] = cleandoc(func.__doc__) else: sections[BEAM_SPECIFIC] = ( "This operation has no known divergences from the " "pandas API.") def format_section(header): return '\n'.join([header, ''.join('-' for _ in header), sections[header]]) func.__doc__ = '\n\n'.join([intro] + [ format_section(header) for header in SECTION_ORDER if header in sections ]) return func return wrap
[docs] def populate_defaults(base_type, removed_method=False, removed_args=None): """Populate default values for keyword arguments in decorated function. When applied to a function, this decorator creates a new function with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. For internal use only. No backwards compatibility guarantees. Args: base_type: The pandas type of the method that this is trying to replicate. removed_method: Whether this method has been removed in the running Pandas version. removed_args: If not empty, which arguments have been dropped in the running Pandas version. """ def wrap(func): if removed_method: return func base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__))) if not base_argspec.defaults and not base_argspec.kwonlydefaults: return func arg_to_default = {} if base_argspec.defaults: arg_to_default.update( zip( base_argspec.args[-len(base_argspec.defaults):], base_argspec.defaults)) if base_argspec.kwonlydefaults: arg_to_default.update(base_argspec.kwonlydefaults) unwrapped_func = unwrap(func) # args that do not have defaults in func, but do have defaults in base func_argspec = getfullargspec(unwrapped_func) num_non_defaults = len(func_argspec.args) - len(func_argspec.defaults or ()) defaults_to_populate = set( func_argspec.args[:num_non_defaults]).intersection( arg_to_default.keys()) if removed_args: defaults_to_populate -= set(removed_args) # In pandas 2, many methods rely on the default copy=None # to mean that copy is the value of copy_on_write. Since # copy_on_write will always be true for Beam, just fill it # in here. In pandas 1, the default was True anyway. if 'copy' in arg_to_default and arg_to_default['copy'] is None: arg_to_default['copy'] = True @functools.wraps(func) def wrapper(**kwargs): for name in defaults_to_populate: if name not in kwargs: kwargs[name] = arg_to_default[name] return func(**kwargs) return wrapper return wrap
_WONT_IMPLEMENT_REASONS = { 'order-sensitive': { 'explanation': "because it is sensitive to the order of the data.", 'url': 'https://s.apache.org/dataframe-order-sensitive-operations', }, 'non-deferred-columns': { 'explanation': ( "because the columns in the output DataFrame depend " "on the data."), 'url': 'https://s.apache.org/dataframe-non-deferred-columns', }, 'non-deferred-result': { 'explanation': ( "because it produces an output type that is not " "deferred."), 'url': 'https://s.apache.org/dataframe-non-deferred-result', }, 'plotting-tools': { 'explanation': "because it is a plotting tool.", 'url': 'https://s.apache.org/dataframe-plotting-tools', }, 'event-time-semantics': { 'explanation': ( "because implementing it would require integrating with Beam " "event-time semantics"), 'url': 'https://s.apache.org/dataframe-event-time-semantics', }, 'deprecated': { 'explanation': "because it is deprecated in pandas.", }, 'experimental': { 'explanation': "because it is experimental in pandas.", }, }
[docs] class WontImplementError(NotImplementedError): """An subclass of NotImplementedError to raise indicating that implementing the given method is not planned. Raising this error will also prevent this doctests from being validated when run with the beam dataframe validation doctest runner. """ def __init__(self, msg, reason=None): if reason is not None: if reason not in _WONT_IMPLEMENT_REASONS: raise AssertionError( f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, " f"got {reason!r}") reason_data = _WONT_IMPLEMENT_REASONS[reason] if 'url' in reason_data: msg = f"{msg}\nFor more information see {reason_data['url']}." super().__init__(msg)