Source code for apache_beam.dataframe.frames

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import pandas as pd

from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import partitionings


[docs]@frame_base.DeferredFrame._register_for(pd.Series) class DeferredSeries(frame_base.DeferredFrame): def __array__(self, dtype=None): raise frame_base.WontImplementError( 'Conversion to a non-deferred a numpy array.') isna = frame_base._elementwise_method('isna') notnull = notna = frame_base._elementwise_method('notna') transform = frame_base._elementwise_method( 'transform', restrictions={'axis': 0})
[docs] def agg(self, *args, **kwargs): return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'agg', lambda df: df.agg(*args, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=partitionings.Singleton()))
all = frame_base._associative_agg_method('all') any = frame_base._associative_agg_method('any') min = frame_base._associative_agg_method('min') max = frame_base._associative_agg_method('max') prod = product = frame_base._associative_agg_method('prod') sum = frame_base._associative_agg_method('sum') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive')
[docs] @frame_base.args_to_kwargs(pd.Series) @frame_base.populate_defaults(pd.Series) @frame_base.maybe_inplace def replace(self, limit, **kwargs): if limit is None: requires_partition_by = partitionings.Nothing() else: requires_partition_by = partitionings.Singleton() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'replace', lambda df: df.replace(limit=limit, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
[docs] def unstack(self, *args, **kwargs): raise frame_base.WontImplementError('non-deferred column values')
for base in ['add', 'sub', 'mul', 'div', 'truediv', 'floordiv', 'mod', 'pow']: for p in ['%s', 'r%s', '__%s__', '__r%s__']: # TODO: non-trivial level? name = p % base setattr( DeferredSeries, name, frame_base._elementwise_method(name, restrictions={'level': None})) setattr( DeferredSeries, '__i%s__' % base, frame_base._elementwise_method('__i%s__' % base, inplace=True)) for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']: setattr(DeferredSeries, name, frame_base._elementwise_method(name)) for name in ['apply', 'map', 'transform']: setattr(DeferredSeries, name, frame_base._elementwise_method(name))
[docs]@frame_base.DeferredFrame._register_for(pd.DataFrame) class DeferredDataFrame(frame_base.DeferredFrame): @property def T(self): return self.transpose()
[docs] def groupby(self, cols): # TODO: what happens to the existing index? # We set the columns to index as we have a notion of being partitioned by # index, but not partitioned by an arbitrary subset of columns. return DeferredGroupBy( expressions.ComputedExpression( 'groupbyindex', lambda df: df.groupby(level=list(range(df.index.nlevels))), [self.set_index(cols)._expr], requires_partition_by=partitionings.Index(), preserves_partition_by=partitionings.Singleton()))
def __getattr__(self, name): # Column attribute access. if name in self._expr.proxy().columns: return self[name] else: return object.__getattribute__(self, name) def __getitem__(self, key): if key in self._expr.proxy().columns: return self._elementwise(lambda df: df[key], 'get_column') else: raise NotImplementedError(key) def __setitem__(self, key, value): if isinstance(key, str): # yapf: disable return self._elementwise( lambda df, key, value: df.__setitem__(key, value), 'set_column', (key, value), inplace=True) else: raise NotImplementedError(key)
[docs] def set_index(self, keys, **kwargs): if isinstance(keys, str): keys = [keys] if not set(keys).issubset(self._expr.proxy().columns): raise NotImplementedError(keys) return self._elementwise( lambda df: df.set_index(keys, **kwargs), 'set_index', inplace=kwargs.get('inplace', False))
[docs] def at(self, *args, **kwargs): raise NotImplementedError()
@property def loc(self): return _DeferredLoc(self)
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) def aggregate(self, axis, **kwargs): if axis is None: return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs) return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'aggregate', lambda df: df.agg(axis=axis, **kwargs), [self._expr], # TODO(robertwb): Sub-aggregate when possible. requires_partition_by=partitionings.Singleton()))
agg = aggregate applymap = frame_base._elementwise_method('applymap') memory_usage = frame_base.wont_implement_method('non-deferred value') all = frame_base._associative_agg_method('all') any = frame_base._associative_agg_method('any') cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method( 'order-sensitive') diff = frame_base.wont_implement_method('order-sensitive') max = frame_base._associative_agg_method('max') min = frame_base._associative_agg_method('min') mode = frame_base._agg_method('mode')
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace def dropna(self, axis, **kwargs): # TODO(robertwb): This is a common pattern. Generalize? if axis == 1 or axis == 'columns': requires_partition_by = partitionings.Singleton() else: requires_partition_by = partitionings.Nothing() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'dropna', lambda df: df.dropna(axis=axis, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
items = itertuples = iterrows = iteritems = frame_base.wont_implement_method( 'non-lazy') isna = frame_base._elementwise_method('isna') notnull = notna = frame_base._elementwise_method('notna') prod = product = frame_base._associative_agg_method('prod')
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) def quantile(self, axis, **kwargs): if axis == 1 or axis == 'columns': raise frame_base.WontImplementError('non-deferred column values') return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'quantile', lambda df: df.quantile(axis=axis, **kwargs), [self._expr], #TODO(robertwb): Approximate quantiles? requires_partition_by=partitionings.Singleton(), preserves_partition_by=partitionings.Singleton()))
query = frame_base._elementwise_method('query')
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace def replace(self, limit, **kwargs): if limit is None: requires_partition_by = partitionings.Nothing() else: requires_partition_by = partitionings.Singleton() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'replace', lambda df: df.replace(limit=limit, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace def reset_index(self, level, **kwargs): if level is not None and not isinstance(level, (tuple, list)): level = [level] if level is None or len(level) == len(self._expr.proxy().index.levels): # TODO: Could do distributed re-index with offsets. requires_partition_by = partitionings.Singleton() else: requires_partition_by = partitionings.Nothing() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'reset_index', lambda df: df.reset_index(level=level, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round') select_dtypes = frame_base._elementwise_method('select_dtypes')
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) def shift(self, axis, **kwargs): if axis == 1 or axis == 'columns': requires_partition_by = partitionings.Nothing() else: requires_partition_by = partitionings.Singleton() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'shift', lambda df: df.shift(axis=axis, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
@property def shape(self): raise frame_base.WontImplementError('scalar value')
[docs] @frame_base.args_to_kwargs(pd.DataFrame) @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace def sort_values(self, axis, **kwargs): if axis == 1 or axis == 'columns': requires_partition_by = partitionings.Nothing() else: requires_partition_by = partitionings.Singleton() return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'sort_values', lambda df: df.sort_values(axis=axis, **kwargs), [self._expr], preserves_partition_by=partitionings.Singleton(), requires_partition_by=requires_partition_by))
stack = frame_base._elementwise_method('stack') sum = frame_base._associative_agg_method('sum') to_records = to_dict = to_numpy = to_string = ( frame_base.wont_implement_method('non-deferred value')) to_sparse = to_string # frame_base._elementwise_method('to_sparse') transform = frame_base._elementwise_method( 'transform', restrictions={'axis': 0})
[docs] def transpose(self, *args, **kwargs): raise frame_base.WontImplementError('non-deferred column values')
[docs] def unstack(self, *args, **kwargs): if self._expr.proxy().index.nlevels == 1: return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'unstack', lambda df: df.unstack(*args, **kwargs), [self._expr], requires_partition_by=partitionings.Index())) else: raise frame_base.WontImplementError('non-deferred column values')
update = frame_base._proxy_method( 'update', inplace=True, requires_partition_by=partitionings.Index(), preserves_partition_by=partitionings.Index())
for meth in ('filter', ): setattr(DeferredDataFrame, meth, frame_base._elementwise_method(meth))
[docs]class DeferredGroupBy(frame_base.DeferredFrame):
[docs] def agg(self, fn): if not callable(fn): raise NotImplementedError(fn) return DeferredDataFrame( expressions.ComputedExpression( 'agg', lambda df: df.agg(fn), [self._expr], requires_partition_by=partitionings.Index(), preserves_partition_by=partitionings.Singleton()))
def _liftable_agg(meth): name, func = frame_base.name_and_func(meth) def wrapper(self, *args, **kargs): assert isinstance(self, DeferredGroupBy) ungrouped = self._expr.args()[0] pre_agg = expressions.ComputedExpression( 'pre_combine_' + name, lambda df: func(df.groupby(level=list(range(df.index.nlevels)))), [ungrouped], requires_partition_by=partitionings.Nothing(), preserves_partition_by=partitionings.Singleton()) post_agg = expressions.ComputedExpression( 'post_combine_' + name, lambda df: func(df.groupby(level=list(range(df.index.nlevels)))), [pre_agg], requires_partition_by=partitionings.Index(), preserves_partition_by=partitionings.Singleton()) return frame_base.DeferredFrame.wrap(post_agg) return wrapper def _unliftable_agg(meth): name, func = frame_base.name_and_func(meth) def wrapper(self, *args, **kargs): assert isinstance(self, DeferredGroupBy) ungrouped = self._expr.args()[0] post_agg = expressions.ComputedExpression( name, lambda df: func(df.groupby(level=list(range(df.index.nlevels)))), [ungrouped], requires_partition_by=partitionings.Index(), preserves_partition_by=partitionings.Singleton()) return frame_base.DeferredFrame.wrap(post_agg) return wrapper LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'size', 'sum'] UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var'] for meth in LIFTABLE_AGGREGATIONS: setattr(DeferredGroupBy, meth, _liftable_agg(meth)) for meth in UNLIFTABLE_AGGREGATIONS: setattr(DeferredGroupBy, meth, _unliftable_agg(meth)) class _DeferredLoc(object): def __init__(self, frame): self._frame = frame def __getitem__(self, index): if isinstance(index, tuple): rows, cols = index return self[rows][cols] elif isinstance(index, list) and index and isinstance(index[0], bool): # Aligned by numerical index. raise NotImplementedError(type(index)) elif isinstance(index, list): # Select rows, but behaves poorly on missing values. raise NotImplementedError(type(index)) elif isinstance(index, slice): args = [self._frame._expr] func = lambda df: df.loc[index] elif isinstance(index, frame_base.DeferredFrame): args = [self._frame._expr, index._expr] func = lambda df, index: df.loc[index] elif callable(index): def checked_callable_index(df): computed_index = index(df) if isinstance(computed_index, tuple): row_index, _ = computed_index else: row_index = computed_index if isinstance(row_index, list) and row_index and isinstance( row_index[0], bool): raise NotImplementedError(type(row_index)) elif not isinstance(row_index, (slice, pd.Series)): raise NotImplementedError(type(row_index)) return computed_index args = [self._frame._expr] func = lambda df: df.loc[checked_callable_index] else: raise NotImplementedError(type(index)) return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( 'loc', func, args, requires_partition_by=( partitionings.Index() if len(args) > 1 else partitionings.Nothing()), preserves_partition_by=partitionings.Singleton()))