#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import pandas as pd
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import io
from apache_beam.dataframe import partitionings
[docs]@frame_base.DeferredFrame._register_for(pd.Series)
class DeferredSeries(frame_base.DeferredFrame):
def __array__(self, dtype=None):
raise frame_base.WontImplementError(
'Conversion to a non-deferred a numpy array.')
astype = frame_base._elementwise_method('astype')
between = frame_base._elementwise_method('between')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def dropna(self, **kwargs):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dropna',
lambda df: df.dropna(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
items = iteritems = frame_base.wont_implement_method('non-lazy')
isin = frame_base._elementwise_method('isin')
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def fillna(self, value, method):
if method is not None:
raise frame_base.WontImplementError('order-sensitive')
if isinstance(value, frame_base.DeferredBase):
value_expr = value._expr
else:
value_expr = expressions.ConstantExpression(value)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'fillna',
lambda df,
value: df.fillna(value, method=method), [self._expr, value_expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
reindex = frame_base.not_implemented_method('reindex')
to_numpy = to_string = frame_base.wont_implement_method('non-deferred value')
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
[docs] def aggregate(self, func, axis=0, *args, **kwargs):
if isinstance(func, list) and len(func) > 1:
# Aggregate each column separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
# We're only handling a single column.
base_func = func[0] if isinstance(func, list) else func
if _is_associative(base_func) and not args and not kwargs:
intermediate = expressions.elementwise_expression(
'pre_aggregate',
lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
allow_nonparallel_final = True
else:
intermediate = self._expr
allow_nonparallel_final = None # i.e. don't change the value
with expressions.allow_non_parallel_operations(allow_nonparallel_final):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda s: s.agg(func, *args, **kwargs), [intermediate],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
agg = aggregate
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
min = frame_base._agg_method('min')
max = frame_base._agg_method('max')
prod = product = frame_base._agg_method('prod')
sum = frame_base._agg_method('sum')
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
head = tail = frame_base.wont_implement_method('order-sensitive')
memory_usage = frame_base.wont_implement_method('non-deferred value')
# In Series __contains__ checks the index
__contains__ = frame_base.wont_implement_method('non-deferred value')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nlargest(self, keep, **kwargs):
# TODO(robertwb): Document 'any' option.
# TODO(robertwb): Consider (conditionally) defaulting to 'any' if no
# explicit keep parameter is requested.
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nlargest-per-partition',
lambda df: df.nlargest(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nlargest',
lambda df: df.nlargest(**kwargs), [per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nsmallest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nsmallest-per-partition',
lambda df: df.nsmallest(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nsmallest',
lambda df: df.nsmallest(**kwargs), [per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
rename_axis = frame_base._elementwise_method('rename_axis')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
lambda df: df.replace(limit=limit, **kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round')
searchsorted = frame_base.wont_implement_method('order-sensitive')
shift = frame_base.wont_implement_method('order-sensitive')
take = frame_base.wont_implement_method('deprecated')
to_dict = frame_base.wont_implement_method('non-deferred')
to_frame = frame_base._elementwise_method('to_frame')
[docs] def unique(self, as_series=False):
if not as_series:
raise frame_base.WontImplementError(
'pass as_series=True to get the result as a (deferred) Series')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'unique',
lambda df: pd.Series(df.unique()), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] def update(self, other):
self._expr = expressions.ComputedExpression(
'update',
lambda df,
other: df.update(other) or df, [self._expr, other._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index())
unstack = frame_base.wont_implement_method('non-deferred column values')
values = property(frame_base.wont_implement_method('non-deferred'))
view = frame_base.wont_implement_method('memory sharing semantics')
@property
def str(self):
expr = expressions.ComputedExpression(
'str',
lambda df: df.str, [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton())
return _DeferredStringMethods(expr)
for name in ['apply', 'map', 'transform']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
[docs]@frame_base.DeferredFrame._register_for(pd.DataFrame)
class DeferredDataFrame(frame_base.DeferredFrame):
@property
def T(self):
return self.transpose()
@property
def columns(self):
return self._expr.proxy().columns
[docs] def groupby(self, by):
# TODO: what happens to the existing index?
# We set the columns to index as we have a notion of being partitioned by
# index, but not partitioned by an arbitrary subset of columns.
return DeferredGroupBy(
expressions.ComputedExpression(
'groupbyindex',
lambda df: df.groupby(level=list(range(df.index.nlevels))),
[self.set_index(by)._expr],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()))
def __getattr__(self, name):
# Column attribute access.
if name in self._expr.proxy().columns:
return self[name]
else:
return object.__getattribute__(self, name)
def __getitem__(self, key):
# TODO: Replicate pd.DataFrame.__getitem__ logic
if isinstance(key, frame_base.DeferredBase):
# Fail early if key is a DeferredBase as it interacts surprisingly with
# key in self._expr.proxy().columns
raise NotImplementedError(
"Indexing with a deferred frame is not yet supported. Consider "
"using df.loc[...]")
if (isinstance(key, list) and
all(key_column in self._expr.proxy().columns
for key_column in key)) or key in self._expr.proxy().columns:
return self._elementwise(lambda df: df[key], 'get_column')
else:
raise NotImplementedError(key)
def __contains__(self, key):
# Checks if proxy has the given column
return self._expr.proxy().__contains__(key)
def __setitem__(self, key, value):
if isinstance(key, str):
# yapf: disable
return self._elementwise(
lambda df, key, value: df.__setitem__(key, value),
'set_column',
(key, value),
inplace=True)
else:
raise NotImplementedError(key)
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def set_index(self, keys, **kwargs):
if isinstance(keys, str):
keys = [keys]
if not set(keys).issubset(self._expr.proxy().columns):
raise NotImplementedError(keys)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'set_index',
lambda df: df.set_index(keys, **kwargs),
[self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Nothing()))
at = frame_base.not_implemented_method('at')
@property
def loc(self):
return _DeferredLoc(self)
_get_index = _set_index = frame_base.not_implemented_method('index')
index = property(_get_index, _set_index)
@property
def axes(self):
return (self.index, self.columns)
apply = frame_base.not_implemented_method('apply')
explode = frame_base.not_implemented_method('explode')
isin = frame_base.not_implemented_method('isin')
assign = frame_base.not_implemented_method('assign')
append = frame_base.not_implemented_method('append')
combine = frame_base.not_implemented_method('combine')
combine_first = frame_base.not_implemented_method('combine_first')
cov = frame_base.not_implemented_method('cov')
corr = frame_base.not_implemented_method('corr')
count = frame_base.not_implemented_method('count')
dot = frame_base.not_implemented_method('dot')
drop = frame_base.not_implemented_method('drop')
eval = frame_base.not_implemented_method('eval')
reindex = frame_base.not_implemented_method('reindex')
melt = frame_base.not_implemented_method('melt')
pivot = frame_base.not_implemented_method('pivot')
pivot_table = frame_base.not_implemented_method('pivot_table')
[docs] def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
# Aggregate across all elements by first aggregating across columns,
# then across rows.
return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
func, *args, **dict(kwargs, axis=0))
elif axis in (1, 'columns'):
# This is an easy elementwise aggregation.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, axis=1, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Nothing()))
elif len(self._expr.proxy().columns) == 0 or args or kwargs:
# For these corner cases, just colocate everything.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, compute the aggregation of each column separately,
# then recombine.
if not isinstance(func, dict):
col_names = list(self._expr.proxy().columns)
func = {col: func for col in col_names}
else:
col_names = list(func.keys())
aggregated_cols = []
for col in col_names:
funcs = func[col]
if not isinstance(funcs, list):
funcs = [funcs]
aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
# The final shape is different depending on whether any of the columns
# were aggregated by a list of aggregators.
with expressions.allow_non_parallel_operations():
if any(isinstance(funcs, list) for funcs in func.values()):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.DataFrame(
{col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton()))
else:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.Series(
{col: value[0] for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton(),
proxy=self._expr.proxy().agg(func, *args, **kwargs)))
agg = aggregate
applymap = frame_base._elementwise_method('applymap')
memory_usage = frame_base.wont_implement_method('non-deferred value')
info = frame_base.wont_implement_method('non-deferred value')
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
head = tail = frame_base.wont_implement_method('order-sensitive')
max = frame_base._agg_method('max')
min = frame_base._agg_method('min')
[docs] def mode(self, axis=0, *args, **kwargs):
if axis == 1 or axis == 'columns':
# Number of columns is max(number mode values for each row), so we can't
# determine how many there will be before looking at the data.
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'mode',
lambda df: df.mode(*args, **kwargs),
[self._expr],
#TODO(robertwb): Approximate?
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def dropna(self, axis, **kwargs):
# TODO(robertwb): This is a common pattern. Generalize?
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dropna',
lambda df: df.dropna(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, **kwargs):
if method is not None and axis in (0, 'index'):
raise frame_base.WontImplementError('order-sensitive')
if isinstance(value, frame_base.DeferredBase):
value_expr = value._expr
else:
value_expr = expressions.ConstantExpression(value)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'fillna',
lambda df, value: df.fillna(
value, method=method, axis=axis, **kwargs),
[self._expr, value_expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
'non-lazy')
def _cols_as_temporary_index(self, cols, suffix=''):
original_index_names = list(self._expr.proxy().index.names)
new_index_names = [
'__apache_beam_temp_%d_%s' % (ix, suffix)
for (ix, _) in enumerate(original_index_names)]
def reindex(df):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'reindex',
lambda df:
df.rename_axis(index=new_index_names, copy=False)
.reset_index().set_index(cols),
[df._expr],
preserves_partition_by=partitionings.Nothing(),
requires_partition_by=partitionings.Nothing()))
def revert(df):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_restoreindex',
lambda df:
df.reset_index().set_index(new_index_names)
.rename_axis(index=original_index_names, copy=False),
[df._expr],
preserves_partition_by=partitionings.Nothing(),
requires_partition_by=partitionings.Nothing()))
return reindex, revert
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def join(self, other, on, **kwargs):
if on is not None:
reindex, revert = self._cols_as_temporary_index(on)
return revert(reindex(self).join(other, **kwargs))
if isinstance(other, list):
other_is_list = True
else:
other = [other]
other_is_list = False
placeholder = object()
other_exprs = [
df._expr for df in other if isinstance(df, frame_base.DeferredFrame)]
const_others = [
placeholder if isinstance(df, frame_base.DeferredFrame) else df
for df in other]
def fill_placeholders(values):
values = iter(values)
filled = [
next(values) if df is placeholder else df for df in const_others]
if other_is_list:
return filled
else:
return filled[0]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join',
lambda df, *deferred_others: df.join(
fill_placeholders(deferred_others), **kwargs),
[self._expr] + other_exprs,
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def merge(
self,
right,
on,
left_on,
right_on,
left_index,
right_index,
**kwargs):
self_proxy = self._expr.proxy()
right_proxy = right._expr.proxy()
# Validate with a pandas call.
_ = self_proxy.merge(
right_proxy,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
**kwargs)
if not any([on, left_on, right_on, left_index, right_index]):
on = [col for col in self_proxy.columns() if col in right_proxy.columns()]
if not left_on:
left_on = on
elif not isinstance(left_on, list):
left_on = [left_on]
if not right_on:
right_on = on
elif not isinstance(right_on, list):
right_on = [right_on]
if left_index:
indexed_left = self
else:
indexed_left = self.set_index(left_on, drop=False)
if right_index:
indexed_right = right
else:
indexed_right = right.set_index(right_on, drop=False)
merged = frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'merge',
lambda left, right: left.merge(
right, left_index=True, right_index=True, **kwargs),
[indexed_left._expr, indexed_right._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index()))
if left_index or right_index:
return merged
else:
return merged.reset_index(drop=True)
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nlargest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nlargest-per-partition',
lambda df: df.nlargest(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nlargest',
lambda df: df.nlargest(**kwargs),
[per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nsmallest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nsmallest-per-partition',
lambda df: df.nsmallest(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nsmallest',
lambda df: df.nsmallest(**kwargs),
[per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
def nunique(self, **kwargs):
if kwargs.get('axis', None) in (1, 'columns'):
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nunique',
lambda df: df.nunique(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
prod = product = frame_base._agg_method('prod')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def quantile(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'quantile',
lambda df: df.quantile(axis=axis, **kwargs),
[self._expr],
#TODO(robertwb): Approximate quantiles?
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
query = frame_base._elementwise_method('query')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.maybe_inplace
def rename(self, **kwargs):
rename_index = (
'index' in kwargs
or kwargs.get('axis', None) in (0, 'index')
or ('columns' not in kwargs and 'axis' not in kwargs))
if rename_index:
# Technically, it's still partitioned by index, but it's no longer
# partitioned by the hash of the index.
preserves_partition_by = partitionings.Nothing()
else:
preserves_partition_by = partitionings.Singleton()
if kwargs.get('errors', None) == 'raise' and rename_index:
# Renaming index with checking.
requires_partition_by = partitionings.Singleton()
proxy = self._expr.proxy()
else:
requires_partition_by = partitionings.Nothing()
proxy = None
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'rename',
lambda df: df.rename(**kwargs),
[self._expr],
proxy=proxy,
preserves_partition_by=preserves_partition_by,
requires_partition_by=requires_partition_by))
rename_axis = frame_base._elementwise_method('rename_axis')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
lambda df: df.replace(limit=limit, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def reset_index(self, level=None, **kwargs):
if level is not None and not isinstance(level, (tuple, list)):
level = [level]
if level is None or len(level) == len(self._expr.proxy().index.levels):
# TODO: Could do distributed re-index with offsets.
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'reset_index',
lambda df: df.reset_index(level=level, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round')
select_dtypes = frame_base._elementwise_method('select_dtypes')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def shift(self, axis, **kwargs):
if 'freq' in kwargs:
raise frame_base.WontImplementError('data-dependent')
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'shift',
lambda df: df.shift(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
@property
def shape(self):
raise frame_base.WontImplementError('scalar value')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def sort_values(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'sort_values',
lambda df: df.sort_values(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
stack = frame_base._elementwise_method('stack')
sum = frame_base._agg_method('sum')
take = frame_base.wont_implement_method('deprecated')
to_records = to_dict = to_numpy = to_string = (
frame_base.wont_implement_method('non-deferred value'))
to_sparse = to_string # frame_base._elementwise_method('to_sparse')
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
transpose = frame_base.wont_implement_method('non-deferred column values')
[docs] def unstack(self, *args, **kwargs):
if self._expr.proxy().index.nlevels == 1:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'unstack',
lambda df: df.unstack(*args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Index()))
else:
raise frame_base.WontImplementError('non-deferred column values')
update = frame_base._proxy_method(
'update',
inplace=True,
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Index())
for io_func in dir(io):
if io_func.startswith('to_'):
setattr(DeferredDataFrame, io_func, getattr(io, io_func))
for meth in ('filter', ):
setattr(DeferredDataFrame, meth, frame_base._elementwise_method(meth))
[docs]class DeferredGroupBy(frame_base.DeferredFrame):
[docs] def agg(self, fn):
if not callable(fn):
# TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
# running doctests for pandas.core.groupby.generic
raise NotImplementedError('GroupBy.agg currently only supports callable '
'arguments')
return DeferredDataFrame(
expressions.ComputedExpression(
'agg',
lambda df: df.agg(fn), [self._expr],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()))
def _liftable_agg(meth):
name, func = frame_base.name_and_func(meth)
def wrapper(self, *args, **kargs):
assert isinstance(self, DeferredGroupBy)
ungrouped = self._expr.args()[0]
pre_agg = expressions.ComputedExpression(
'pre_combine_' + name,
lambda df: func(df.groupby(level=list(range(df.index.nlevels)))),
[ungrouped],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton())
post_agg = expressions.ComputedExpression(
'post_combine_' + name,
lambda df: func(df.groupby(level=list(range(df.index.nlevels)))),
[pre_agg],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton())
return frame_base.DeferredFrame.wrap(post_agg)
return wrapper
def _unliftable_agg(meth):
name, func = frame_base.name_and_func(meth)
def wrapper(self, *args, **kargs):
assert isinstance(self, DeferredGroupBy)
ungrouped = self._expr.args()[0]
post_agg = expressions.ComputedExpression(
name,
lambda df: func(df.groupby(level=list(range(df.index.nlevels)))),
[ungrouped],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton())
return frame_base.DeferredFrame.wrap(post_agg)
return wrapper
LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'size', 'sum']
UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
for meth in LIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _liftable_agg(meth))
for meth in UNLIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
def _is_associative(func):
return func in LIFTABLE_AGGREGATIONS or (
getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS
and func.__module__ in ('numpy', 'builtins'))
class _DeferredLoc(object):
def __init__(self, frame):
self._frame = frame
def __getitem__(self, index):
if isinstance(index, tuple):
rows, cols = index
return self[rows][cols]
elif isinstance(index, list) and index and isinstance(index[0], bool):
# Aligned by numerical index.
raise NotImplementedError(type(index))
elif isinstance(index, list):
# Select rows, but behaves poorly on missing values.
raise NotImplementedError(type(index))
elif isinstance(index, slice):
args = [self._frame._expr]
func = lambda df: df.loc[index]
elif isinstance(index, frame_base.DeferredFrame):
args = [self._frame._expr, index._expr]
func = lambda df, index: df.loc[index]
elif callable(index):
def checked_callable_index(df):
computed_index = index(df)
if isinstance(computed_index, tuple):
row_index, _ = computed_index
else:
row_index = computed_index
if isinstance(row_index, list) and row_index and isinstance(
row_index[0], bool):
raise NotImplementedError(type(row_index))
elif not isinstance(row_index, (slice, pd.Series)):
raise NotImplementedError(type(row_index))
return computed_index
args = [self._frame._expr]
func = lambda df: df.loc[checked_callable_index]
else:
raise NotImplementedError(type(index))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'loc',
func,
args,
requires_partition_by=(
partitionings.Index()
if len(args) > 1
else partitionings.Nothing()),
preserves_partition_by=partitionings.Singleton()))
class _DeferredStringMethods(frame_base.DeferredBase):
pass
ELEMENTWISE_STRING_METHODS = [
'capitalize',
'casefold',
'contains',
'count',
'endswith',
'extract',
'extractall',
'findall',
'get',
'get_dummies',
'isalpha',
'isalnum',
'isdecimal',
'isdigit',
'islower',
'isnumeric',
'isspace',
'istitle',
'isupper',
'join',
'len',
'lower',
'lstrip',
'pad',
'partition',
'replace',
'rpartition',
'rsplit',
'rstrip',
'slice',
'slice_replace',
'split',
'startswith',
'strip',
'swapcase',
'title',
'upper',
'wrap',
'zfill',
'__getitem__',
]
for method in ELEMENTWISE_STRING_METHODS:
setattr(_DeferredStringMethods,
method,
frame_base._elementwise_method(method))
for base in ['add',
'sub',
'mul',
'div',
'truediv',
'floordiv',
'mod',
'pow',
'and',
'or']:
for p in ['%s', 'r%s', '__%s__', '__r%s__']:
# TODO: non-trivial level?
name = p % base
setattr(
DeferredSeries,
name,
frame_base._elementwise_method(name, restrictions={'level': None}))
setattr(
DeferredDataFrame,
name,
frame_base._elementwise_method(name, restrictions={'level': None}))
setattr(
DeferredSeries,
'__i%s__' % base,
frame_base._elementwise_method('__i%s__' % base, inplace=True))
setattr(
DeferredDataFrame,
'__i%s__' % base,
frame_base._elementwise_method('__i%s__' % base, inplace=True))
for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))