Source code for apache_beam.dataframe.frame_base

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import functools
import inspect
import sys
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import pandas as pd

from apache_beam.dataframe import expressions
from apache_beam.dataframe import partitionings

# pylint: disable=deprecated-method
if sys.version_info < (3, ):
  _getargspec = inspect.getargspec

  def _unwrap(func):
    while hasattr(func, '__wrapped__'):
      func = func.__wrapped__
    return func
else:
  _getargspec = inspect.getfullargspec
  _unwrap = inspect.unwrap


[docs]class DeferredBase(object): _pandas_type_map = {} # type: Dict[Union[type, None], type] def __init__(self, expr): self._expr = expr @classmethod def _register_for(cls, pandas_type): def wrapper(deferred_type): cls._pandas_type_map[pandas_type] = deferred_type return deferred_type return wrapper
[docs] @classmethod def wrap(cls, expr, split_tuples=True): proxy_type = type(expr.proxy()) if proxy_type is tuple and split_tuples: def get(ix): return expressions.ComputedExpression( # yapf: disable 'get_%d' % ix, lambda t: t[ix], [expr], requires_partition_by=partitionings.Nothing(), preserves_partition_by=partitionings.Singleton()) return tuple([cls.wrap(get(ix)) for ix in range(len(expr.proxy()))]) elif proxy_type in cls._pandas_type_map: wrapper_type = cls._pandas_type_map[proxy_type] else: if expr.requires_partition_by() != partitionings.Singleton(): raise ValueError( 'Scalar expression %s of type %s partitoned by non-singleton %s' % (expr, proxy_type, expr.requires_partition_by())) wrapper_type = _DeferredScalar return wrapper_type(expr)
def _elementwise(self, func, name=None, other_args=(), inplace=False): return _elementwise_function(func, name, inplace=inplace)(self, *other_args) def __reduce__(self): return UnusableUnpickledDeferredBase, (str(self), )
[docs]class UnusableUnpickledDeferredBase(object): """Placeholder object used to break the transitive pickling chain in case a DeferredBase accidentially gets pickled (e.g. as part of globals). Trying to use this object after unpickling is a bug and will result in an error. """ def __init__(self, name): self._name = name def __repr__(self): return 'UnusablePickledDeferredBase(%r)' % self.name
[docs]class DeferredFrame(DeferredBase): @property def dtypes(self): return self._expr.proxy().dtypes
class _DeferredScalar(DeferredBase): def apply(self, func, name=None, args=()): if name is None: name = func.__name__ with expressions.allow_non_parallel_operations( all(isinstance(arg, _DeferredScalar) for arg in args) or None): return DeferredFrame.wrap( expressions.ComputedExpression( name, func, [self._expr] + [arg._expr for arg in args], requires_partition_by=partitionings.Singleton())) DeferredBase._pandas_type_map[None] = _DeferredScalar
[docs]def name_and_func(method): if isinstance(method, str): return method, lambda df, *args, **kwargs: getattr(df, method)(*args, ** kwargs) else: return method.__name__, method
def _elementwise_method(func, name=None, restrictions=None, inplace=False): return _proxy_method( func, name, restrictions, inplace, requires_partition_by=partitionings.Nothing(), preserves_partition_by=partitionings.Singleton()) def _proxy_method( func, name=None, restrictions=None, inplace=False, requires_partition_by=partitionings.Singleton(), preserves_partition_by=partitionings.Nothing()): if name is None: name, func = name_and_func(func) if restrictions is None: restrictions = {} return _proxy_function( func, name, restrictions, inplace, requires_partition_by, preserves_partition_by) def _elementwise_function(func, name=None, restrictions=None, inplace=False): return _proxy_function( func, name, restrictions, inplace, requires_partition_by=partitionings.Nothing(), preserves_partition_by=partitionings.Singleton()) def _proxy_function( func, # type: Union[Callable, str] name=None, # type: Optional[str] restrictions=None, # type: Optional[Dict[str, Union[Any, List[Any], Callable[[Any], bool]]]] inplace=False, # type: bool requires_partition_by=partitionings.Singleton(), # type: partitionings.Partitioning preserves_partition_by=partitionings.Nothing(), # type: partitionings.Partitioning ): if name is None: if isinstance(func, str): name = func else: name = func.__name__ if restrictions is None: restrictions = {} def wrapper(*args, **kwargs): for key, values in restrictions.items(): if key in kwargs: value = kwargs[key] else: try: ix = _getargspec(func).args.index(key) except ValueError: # TODO: fix for delegation? continue if len(args) <= ix: continue value = args[ix] if callable(values): check = values elif isinstance(values, list): check = lambda x, values=values: x in values else: check = lambda x, value=value: x == value if not check(value): raise NotImplementedError( '%s=%s not supported for %s' % (key, value, name)) deferred_arg_indices = [] deferred_arg_exprs = [] constant_args = [None] * len(args) from apache_beam.dataframe.frames import _DeferredIndex for ix, arg in enumerate(args): if isinstance(arg, DeferredBase): deferred_arg_indices.append(ix) deferred_arg_exprs.append(arg._expr) elif isinstance(arg, _DeferredIndex): # TODO(robertwb): Consider letting indices pass through as indices. # This would require updating the partitioning code, as indices don't # have indices. deferred_arg_indices.append(ix) deferred_arg_exprs.append( expressions.ComputedExpression( 'index_as_series', lambda ix: ix.index.to_series(), # yapf break [arg._frame._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=partitionings.Nothing())) elif isinstance(arg, pd.core.generic.NDFrame): deferred_arg_indices.append(ix) deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0])) else: constant_args[ix] = arg deferred_kwarg_keys = [] deferred_kwarg_exprs = [] constant_kwargs = {key: None for key in kwargs} for key, arg in kwargs.items(): if isinstance(arg, DeferredBase): deferred_kwarg_keys.append(key) deferred_kwarg_exprs.append(arg._expr) elif isinstance(arg, pd.core.generic.NDFrame): deferred_kwarg_keys.append(key) deferred_kwarg_exprs.append( expressions.ConstantExpression(arg, arg[0:0])) else: constant_kwargs[key] = arg deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs if inplace: actual_func = copy_and_mutate(func) else: actual_func = func def apply(*actual_args): actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)], actual_args[len(deferred_arg_exprs):]) full_args = list(constant_args) for ix, arg in zip(deferred_arg_indices, actual_args): full_args[ix] = arg full_kwargs = dict(constant_kwargs) for key, arg in zip(deferred_kwarg_keys, actual_kwargs): full_kwargs[key] = arg return actual_func(*full_args, **full_kwargs) if (not requires_partition_by.is_subpartitioning_of(partitionings.Index()) and sum(isinstance(arg.proxy(), pd.core.generic.NDFrame) for arg in deferred_exprs) > 1): # Implicit join on index if there is more than one indexed input. actual_requires_partition_by = partitionings.Index() else: actual_requires_partition_by = requires_partition_by result_expr = expressions.ComputedExpression( name, apply, deferred_exprs, requires_partition_by=actual_requires_partition_by, preserves_partition_by=preserves_partition_by) if inplace: args[0]._expr = result_expr else: return DeferredFrame.wrap(result_expr) return wrapper def _agg_method(func): def wrapper(self, *args, **kwargs): return self.agg(func, *args, **kwargs) return wrapper
[docs]def wont_implement_method(msg): def wrapper(*args, **kwargs): raise WontImplementError(msg) return wrapper
[docs]def not_implemented_method(op, jira='BEAM-9547'): def wrapper(*args, **kwargs): raise NotImplementedError("'%s' is not yet supported (%s)" % (op, jira)) return wrapper
[docs]def copy_and_mutate(func): def wrapper(self, *args, **kwargs): copy = self.copy() func(copy, *args, **kwargs) return copy return wrapper
[docs]def maybe_inplace(func): @functools.wraps(func) def wrapper(self, inplace=False, **kwargs): result = func(self, **kwargs) if inplace: self._expr = result._expr else: return result return wrapper
[docs]def args_to_kwargs(base_type): def wrap(func): arg_names = _getargspec(_unwrap(getattr(base_type, func.__name__))).args @functools.wraps(func) def wrapper(*args, **kwargs): for name, value in zip(arg_names, args): if name in kwargs: raise TypeError( "%s() got multiple values for argument '%s'" % (func.__name__, name)) kwargs[name] = value return func(**kwargs) return wrapper return wrap
[docs]def populate_defaults(base_type): def wrap(func): base_argspec = _getargspec(_unwrap(getattr(base_type, func.__name__))) if not base_argspec.defaults: return func arg_to_default = dict( zip( base_argspec.args[-len(base_argspec.defaults):], base_argspec.defaults)) unwrapped_func = _unwrap(func) # args that do not have defaults in func, but do have defaults in base func_argspec = _getargspec(unwrapped_func) num_non_defaults = len(func_argspec.args) - len(func_argspec.defaults or ()) defaults_to_populate = set( func_argspec.args[:num_non_defaults]).intersection( arg_to_default.keys()) @functools.wraps(func) def wrapper(**kwargs): for name in defaults_to_populate: if name not in kwargs: kwargs[name] = arg_to_default[name] return func(**kwargs) return wrapper return wrap
[docs]class WontImplementError(NotImplementedError): """An subclass of NotImplementedError to raise indicating that implementing the given method is infeasible. Raising this error will also prevent this doctests from being validated when run with the beam dataframe validation doctest runner. """ pass