#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import collections
import inspect
import math
import numpy as np
import pandas as pd
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import io
from apache_beam.dataframe import partitionings
[docs]def populate_not_implemented(pd_type):
def wrapper(deferred_type):
for attr in dir(pd_type):
# Don't auto-define hidden methods or dunders
if attr.startswith('_'):
continue
if not hasattr(deferred_type, attr):
pd_value = getattr(pd_type, attr)
if isinstance(pd_value, property) or inspect.isclass(pd_value):
# Some of the properties on pandas types (cat, dt, sparse), are
# actually attributes with class values, not properties
setattr(
deferred_type,
attr,
property(frame_base.not_implemented_method(attr)))
elif callable(pd_value):
setattr(deferred_type, attr, frame_base.not_implemented_method(attr))
return deferred_type
return wrapper
[docs]class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
def __array__(self, dtype=None):
raise frame_base.WontImplementError(
'Conversion to a non-deferred a numpy array.')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def droplevel(self, level, axis):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'droplevel',
lambda df: df.droplevel(level, axis=axis), [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Index()
if axis in (1, 'column') else partitionings.Nothing()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def fillna(self, value, method, axis, **kwargs):
if method is not None and axis in (0, 'index'):
raise frame_base.WontImplementError('order-sensitive')
if isinstance(value, frame_base.DeferredBase):
value_expr = value._expr
else:
value_expr = expressions.ConstantExpression(value)
return frame_base.DeferredFrame.wrap(
# yapf: disable
expressions.ComputedExpression(
'fillna',
lambda df,
value: df.fillna(value, method=method, axis=axis, **kwargs),
[self._expr, value_expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def ffill(self, **kwargs):
return self.fillna(method='ffill', **kwargs)
pad = ffill
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
if not as_index:
raise NotImplementedError('groupby(as_index=False)')
if not group_keys:
raise NotImplementedError('groupby(group_keys=False)')
if axis in (1, 'columns'):
return _DeferredGroupByCols(
expressions.ComputedExpression(
'groupbycols',
lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Index()))
if level is None and by is None:
raise TypeError("You have to supply one of 'by' and 'level'")
elif level is not None:
if isinstance(level, (list, tuple)):
levels = level
else:
levels = [level]
all_levels = self._expr.proxy().index.names
levels = [all_levels[i] if isinstance(i, int) else i for i in levels]
levels_to_drop = self._expr.proxy().index.names.difference(levels)
if levels_to_drop:
to_group = self.droplevel(levels_to_drop)._expr
else:
to_group = self._expr
elif callable(by):
def map_index(df):
df = df.copy()
df.index = df.index.map(by)
return df
to_group = expressions.ComputedExpression(
'map_index',
map_index, [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Nothing())
elif isinstance(by, DeferredSeries):
raise NotImplementedError(
"grouping by a Series is not yet implemented. You can group by a "
"DataFrame column by specifying its name.")
elif isinstance(by, np.ndarray):
raise frame_base.WontImplementError('order sensitive')
elif isinstance(self, DeferredDataFrame):
if not isinstance(by, list):
by = [by]
index_names = self._expr.proxy().index.names
index_names_in_by = list(set(by).intersection(index_names))
if index_names_in_by:
if set(by) == set(index_names):
to_group = self._expr
elif set(by).issubset(index_names):
to_group = self.droplevel(index_names.difference(by))._expr
else:
to_group = self.reset_index(index_names_in_by).set_index(by)._expr
else:
to_group = self.set_index(by)._expr
else:
raise NotImplementedError(by)
return DeferredGroupBy(
expressions.ComputedExpression(
'groupbyindex',
lambda df: df.groupby(
level=list(range(df.index.nlevels)), **kwargs), [to_group],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()),
kwargs)
abs = frame_base._elementwise_method('abs')
astype = frame_base._elementwise_method('astype')
copy = frame_base._elementwise_method('copy')
@property
def dtype(self):
return self._expr.proxy().dtype
dtypes = dtype
def _get_index(self):
return _DeferredIndex(self)
index = property(
_get_index, frame_base.not_implemented_method('index (setter)'))
[docs]@populate_not_implemented(pd.Series)
@frame_base.DeferredFrame._register_for(pd.Series)
class DeferredSeries(DeferredDataFrameOrSeries):
def __getitem__(self, key):
if _is_null_slice(key) or key is Ellipsis:
return self
elif (isinstance(key, int) or _is_integer_slice(key)
) and self._expr.proxy().index._should_fallback_to_positional():
raise frame_base.WontImplementError('order sensitive')
elif isinstance(key, slice) or callable(key):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
# yapf: disable
'getitem',
lambda df: df[key],
[self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton()))
elif isinstance(key, DeferredSeries) and key._expr.proxy().dtype == bool:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
# yapf: disable
'getitem',
lambda df,
indexer: df[indexer],
[self._expr, key._expr],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()))
elif pd.core.series.is_iterator(key) or pd.core.common.is_bool_indexer(key):
raise frame_base.WontImplementError('order sensitive')
else:
# We could consider returning a deferred scalar, but that might
# be more surprising than a clear error.
raise frame_base.WontImplementError('non-deferred')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def align(self, other, join, axis, level, method, **kwargs):
if level is not None:
raise NotImplementedError('per-level align')
if method is not None:
raise frame_base.WontImplementError('order-sensitive')
# We're using pd.concat here as expressions don't yet support
# multiple return values.
aligned = frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'align',
lambda x,
y: pd.concat([x, y], axis=1, join='inner'),
[self._expr, other._expr],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Index()))
return aligned.iloc[:, 0], aligned.iloc[:, 1]
array = property(frame_base.wont_implement_method('non-deferred value'))
between = frame_base._elementwise_method('between')
[docs] def dot(self, other):
left = self._expr
if isinstance(other, DeferredSeries):
right = expressions.ComputedExpression(
'to_dataframe',
pd.DataFrame, [other._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Index())
right_is_series = True
elif isinstance(other, DeferredDataFrame):
right = other._expr
right_is_series = False
else:
raise frame_base.WontImplementError('non-deferred result')
dots = expressions.ComputedExpression(
'dot',
# Transpose so we can sum across rows.
(lambda left, right: pd.DataFrame(left @ right).T),
[left, right],
requires_partition_by=partitionings.Index())
with expressions.allow_non_parallel_operations(True):
sums = expressions.ComputedExpression(
'sum',
lambda dots: dots.sum(), #
[dots],
requires_partition_by=partitionings.Singleton())
if right_is_series:
result = expressions.ComputedExpression(
'extract',
lambda df: df[0], [sums],
requires_partition_by=partitionings.Singleton())
else:
result = sums
return frame_base.DeferredFrame.wrap(result)
__matmul__ = dot
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def std(self, axis, skipna, level, ddof, **kwargs):
if level is not None:
raise NotImplementedError("per-level aggregation")
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment
# See the online, numerically stable formulae at
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
# and
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
def compute_moments(x):
n = len(x)
m = x.std(ddof=0)**2 * n
s = x.sum()
return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
def combine_moments(data):
m = s = n = 0.0
for datum in data.itertuples():
if datum.n == 0:
continue
elif n == 0:
m, s, n = datum.m, datum.s, datum.n
else:
delta = s / n - datum.s / datum.n
m += datum.m + delta**2 * n * datum.n / (n + datum.n)
s += datum.s
n += datum.n
if n <= ddof:
return float('nan')
else:
return math.sqrt(m / (n - ddof))
moments = expressions.ComputedExpression(
'compute_moments',
compute_moments, [self._expr],
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'combine_moments',
combine_moments, [moments],
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def corr(self, other, method, min_periods):
if method == 'pearson': # Note that this is the default.
x, y = self.dropna().align(other.dropna(), 'inner')
return x._corr_aligned(y, min_periods)
else:
# The rank-based correlations are not obviously parallelizable, though
# perhaps an approximation could be done with a knowledge of quantiles
# and custom partitioning.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'corr',
lambda df,
other: df.corr(other, method=method, min_periods=min_periods),
[self._expr, other._expr],
requires_partition_by=partitionings.Singleton()))
def _corr_aligned(self, other, min_periods):
std_x = self.std()
std_y = other.std()
cov = self._cov_aligned(other, min_periods)
return cov.apply(
lambda cov, std_x, std_y: cov / (std_x * std_y), args=[std_x, std_y])
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def cov(self, other, min_periods, ddof):
x, y = self.dropna().align(other.dropna(), 'inner')
return x._cov_aligned(y, min_periods, ddof)
def _cov_aligned(self, other, min_periods, ddof=1):
# Use the formulae from
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
def compute_co_moments(x, y):
n = len(x)
if n <= 1:
c = 0
else:
c = x.cov(y) * (n - 1)
sx = x.sum()
sy = y.sum()
return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
def combine_co_moments(data):
c = sx = sy = n = 0.0
for datum in data.itertuples():
if datum.n == 0:
continue
elif n == 0:
c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
else:
c += (
datum.c + (sx / n - datum.sx / datum.n) *
(sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
sx += datum.sx
sy += datum.sy
n += datum.n
if n < max(2, ddof, min_periods or 0):
return float('nan')
else:
return c / (n - ddof)
moments = expressions.ComputedExpression(
'compute_co_moments',
compute_co_moments, [self._expr, other._expr],
requires_partition_by=partitionings.Index())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'combine_co_moments',
combine_co_moments, [moments],
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def dropna(self, **kwargs):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dropna',
lambda df: df.dropna(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
items = iteritems = frame_base.wont_implement_method('non-lazy')
isin = frame_base._elementwise_method('isin')
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
to_numpy = to_string = frame_base.wont_implement_method('non-deferred value')
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
[docs] def aggregate(self, func, axis=0, *args, **kwargs):
if isinstance(func, list) and len(func) > 1:
# Aggregate each column separately, then stick them all together.
rows = [self.agg([f], *args, **kwargs) for f in func]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *rows: pd.concat(rows), [row._expr for row in rows]))
else:
# We're only handling a single column.
base_func = func[0] if isinstance(func, list) else func
if _is_associative(base_func) and not args and not kwargs:
intermediate = expressions.ComputedExpression(
'pre_aggregate',
lambda s: s.agg([base_func], *args, **kwargs), [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Nothing())
allow_nonparallel_final = True
else:
intermediate = self._expr
allow_nonparallel_final = None # i.e. don't change the value
with expressions.allow_non_parallel_operations(allow_nonparallel_final):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda s: s.agg(func, *args, **kwargs), [intermediate],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
agg = aggregate
@property
def axes(self):
return [self.index]
clip = frame_base._elementwise_method('clip')
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
min = frame_base._agg_method('min')
max = frame_base._agg_method('max')
prod = product = frame_base._agg_method('prod')
sum = frame_base._agg_method('sum')
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
head = tail = frame_base.wont_implement_method('order-sensitive')
filter = frame_base._elementwise_method('filter')
memory_usage = frame_base.wont_implement_method('non-deferred value')
# In Series __contains__ checks the index
__contains__ = frame_base.wont_implement_method('non-deferred value')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nlargest(self, keep, **kwargs):
# TODO(robertwb): Document 'any' option.
# TODO(robertwb): Consider (conditionally) defaulting to 'any' if no
# explicit keep parameter is requested.
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nlargest-per-partition',
lambda df: df.nlargest(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nlargest',
lambda df: df.nlargest(**kwargs), [per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nsmallest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nsmallest-per-partition',
lambda df: df.nsmallest(**kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nsmallest',
lambda df: df.nsmallest(**kwargs), [per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
plot = property(frame_base.wont_implement_method('plot'))
pop = frame_base.wont_implement_method('non-lazy')
rename_axis = frame_base._elementwise_method('rename_axis')
[docs] @frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.maybe_inplace
def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
lambda df: df.replace(limit=limit, **kwargs), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round')
searchsorted = frame_base.wont_implement_method('order-sensitive')
shift = frame_base.wont_implement_method('order-sensitive')
take = frame_base.wont_implement_method('deprecated')
to_dict = frame_base.wont_implement_method('non-deferred')
to_frame = frame_base._elementwise_method('to_frame')
[docs] def unique(self, as_series=False):
if not as_series:
raise frame_base.WontImplementError(
'pass as_series=True to get the result as a (deferred) Series')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'unique',
lambda df: pd.Series(df.unique()), [self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] def update(self, other):
self._expr = expressions.ComputedExpression(
'update',
lambda df,
other: df.update(other) or df, [self._expr, other._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index())
unstack = frame_base.wont_implement_method('non-deferred column values')
values = property(frame_base.wont_implement_method('non-deferred'))
view = frame_base.wont_implement_method('memory sharing semantics')
@property
def str(self):
return _DeferredStringMethods(self._expr)
for name in ['apply', 'map', 'transform']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
[docs]@populate_not_implemented(pd.DataFrame)
@frame_base.DeferredFrame._register_for(pd.DataFrame)
class DeferredDataFrame(DeferredDataFrameOrSeries):
@property
def T(self):
return self.transpose()
@property
def columns(self):
return self._expr.proxy().columns
@columns.setter
def columns(self, columns):
def set_columns(df):
df = df.copy()
df.columns = columns
return df
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'set_columns',
set_columns, [self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton()))
def __getattr__(self, name):
# Column attribute access.
if name in self._expr.proxy().columns:
return self[name]
else:
return object.__getattribute__(self, name)
def __getitem__(self, key):
# TODO: Replicate pd.DataFrame.__getitem__ logic
if isinstance(key, DeferredSeries) and key._expr.proxy().dtype == bool:
return self.loc[key]
elif isinstance(key, frame_base.DeferredBase):
# Fail early if key is a DeferredBase as it interacts surprisingly with
# key in self._expr.proxy().columns
raise NotImplementedError(
"Indexing with a non-bool deferred frame is not yet supported. "
"Consider using df.loc[...]")
elif isinstance(key, slice):
if _is_null_slice(key):
return self
elif _is_integer_slice(key):
# This depends on the contents of the index.
raise frame_base.WontImplementError(
'Use iloc or loc with integer slices.')
else:
return self.loc[key]
elif (isinstance(key, list) and
all(key_column in self._expr.proxy().columns
for key_column in key)) or key in self._expr.proxy().columns:
return self._elementwise(lambda df: df[key], 'get_column')
else:
raise NotImplementedError(key)
def __contains__(self, key):
# Checks if proxy has the given column
return self._expr.proxy().__contains__(key)
def __setitem__(self, key, value):
if isinstance(
key, str) or (isinstance(key, list) and
all(isinstance(c, str)
for c in key)) or (isinstance(key, DeferredSeries) and
key._expr.proxy().dtype == bool):
# yapf: disable
return self._elementwise(
lambda df, key, value: df.__setitem__(key, value),
'set_column',
(key, value),
inplace=True)
else:
raise NotImplementedError(key)
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def align(self, other, join, axis, copy, level, method, **kwargs):
if not copy:
raise frame_base.WontImplementError('align(copy=False)')
if method is not None:
raise frame_base.WontImplementError('order-sensitive')
if kwargs:
raise NotImplementedError('align(%s)' % ', '.join(kwargs.keys()))
if level is not None:
# Could probably get by partitioning on the used levels.
requires_partition_by = partitionings.Singleton()
elif axis in ('columns', 1):
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Index()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'align',
lambda df, other: df.align(other, join=join, axis=axis),
[self._expr, other._expr],
requires_partition_by=requires_partition_by,
preserves_partition_by=partitionings.Index()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def set_index(self, keys, **kwargs):
if isinstance(keys, str):
keys = [keys]
if not set(keys).issubset(self._expr.proxy().columns):
raise NotImplementedError(keys)
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'set_index',
lambda df: df.set_index(keys, **kwargs),
[self._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Nothing()))
@property
def loc(self):
return _DeferredLoc(self)
@property
def iloc(self):
return _DeferredILoc(self)
@property
def axes(self):
return (self.index, self.columns)
@property
def dtypes(self):
return self._expr.proxy().dtypes
[docs] def assign(self, **kwargs):
for name, value in kwargs.items():
if not callable(value) and not isinstance(value, DeferredSeries):
raise frame_base.WontImplementError("Unsupported value for new "
f"column '{name}': '{value}'. "
"Only callables and Series "
"instances are supported.")
return frame_base._elementwise_method('assign')(self, **kwargs)
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def explode(self, column, ignore_index):
# ignoring the index will not preserve it
preserves = (partitionings.Nothing() if ignore_index
else partitionings.Singleton())
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'explode',
lambda df: df.explode(column, ignore_index),
[self._expr],
preserves_partition_by=preserves,
requires_partition_by=partitionings.Nothing()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def drop(self, labels, axis, index, columns, errors, **kwargs):
if labels is not None:
if index is not None or columns is not None:
raise ValueError("Cannot specify both 'labels' and 'index'/'columns'")
if axis in (0, 'index'):
index = labels
columns = None
elif axis in (1, 'columns'):
index = None
columns = labels
else:
raise ValueError("axis must be one of (0, 1, 'index', 'columns'), "
"got '%s'" % axis)
if columns is not None:
# Compute the proxy based on just the columns that are dropped.
proxy = self._expr.proxy().drop(columns=columns, errors=errors)
else:
proxy = self._expr.proxy()
if index is not None and errors == 'raise':
# In order to raise an error about missing index values, we'll
# need to collect the entire dataframe.
requires = partitionings.Singleton()
else:
requires = partitionings.Nothing()
return frame_base.DeferredFrame.wrap(expressions.ComputedExpression(
'drop',
lambda df: df.drop(axis=axis, index=index, columns=columns,
errors=errors, **kwargs),
[self._expr],
proxy=proxy,
requires_partition_by=requires))
[docs] def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
# Aggregate across all elements by first aggregating across columns,
# then across rows.
return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
func, *args, **dict(kwargs, axis=0))
elif axis in (1, 'columns'):
# This is an easy elementwise aggregation.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, axis=1, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Nothing()))
elif len(self._expr.proxy().columns) == 0 or args or kwargs:
# For these corner cases, just colocate everything.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, compute the aggregation of each column separately,
# then recombine.
if not isinstance(func, dict):
col_names = list(self._expr.proxy().columns)
func = {col: func for col in col_names}
else:
col_names = list(func.keys())
aggregated_cols = []
for col in col_names:
funcs = func[col]
if not isinstance(funcs, list):
funcs = [funcs]
aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
# The final shape is different depending on whether any of the columns
# were aggregated by a list of aggregators.
with expressions.allow_non_parallel_operations():
if any(isinstance(funcs, list) for funcs in func.values()):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.DataFrame(
{col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton()))
else:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.Series(
{col: value[0] for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton(),
proxy=self._expr.proxy().agg(func, *args, **kwargs)))
agg = aggregate
applymap = frame_base._elementwise_method('applymap')
memory_usage = frame_base.wont_implement_method('non-deferred value')
info = frame_base.wont_implement_method('non-deferred value')
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
clip = frame_base._elementwise_method(
'clip', restrictions={'axis': lambda axis: axis in (0, 'index')})
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def corr(self, method, min_periods):
if method == 'pearson':
proxy = self._expr.proxy().corr()
columns = list(proxy.columns)
args = []
arg_indices = []
for ix, col1 in enumerate(columns):
for col2 in columns[ix+1:]:
arg_indices.append((col1, col2))
# Note that this set may be different for each pair.
no_na = self.loc[self[col1].notna() & self[col2].notna()]
args.append(
no_na[col1]._corr_aligned(no_na[col2], min_periods))
def fill_matrix(*args):
data = collections.defaultdict(dict)
for col in columns:
data[col][col] = 1.0
for ix, (col1, col2) in enumerate(arg_indices):
data[col1][col2] = data[col2][col1] = args[ix]
return pd.DataFrame(data, columns=columns, index=columns)
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'fill_matrix',
fill_matrix,
[arg._expr for arg in args],
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
else:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'corr',
lambda df: df.corr(method=method, min_periods=min_periods),
[self._expr],
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def cov(self, min_periods, ddof):
proxy = self._expr.proxy().corr()
columns = list(proxy.columns)
args = []
arg_indices = []
for col in columns:
arg_indices.append((col, col))
std = self[col].std(ddof)
args.append(std.apply(lambda x: x*x, 'square'))
for ix, col1 in enumerate(columns):
for col2 in columns[ix+1:]:
arg_indices.append((col1, col2))
# Note that this set may be different for each pair.
no_na = self.loc[self[col1].notna() & self[col2].notna()]
args.append(no_na[col1]._cov_aligned(no_na[col2], min_periods, ddof))
def fill_matrix(*args):
data = collections.defaultdict(dict)
for ix, (col1, col2) in enumerate(arg_indices):
data[col1][col2] = data[col2][col1] = args[ix]
return pd.DataFrame(data, columns=columns, index=columns)
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'fill_matrix',
fill_matrix,
[arg._expr for arg in args],
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def corrwith(self, other, axis, drop, method):
if axis not in (0, 'index'):
raise NotImplementedError('corrwith(axis=%r)' % axis)
if not isinstance(other, frame_base.DeferredFrame):
other = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(other))
if isinstance(other, DeferredSeries):
proxy = self._expr.proxy().corrwith(other._expr.proxy(), method=method)
self, other = self.align(other, axis=0, join='inner')
col_names = proxy.index
other_cols = [other] * len(col_names)
elif isinstance(other, DeferredDataFrame):
proxy = self._expr.proxy().corrwith(
other._expr.proxy(), method=method, drop=drop)
self, other = self.align(other, axis=0, join='inner')
col_names = list(
set(self.columns)
.intersection(other.columns)
.intersection(proxy.index))
other_cols = [other[col_name] for col_name in col_names]
else:
# Raise the right error.
self._expr.proxy().corrwith(other._expr.proxy())
# Just in case something else becomes valid.
raise NotImplementedError('corrwith(%s)' % type(other._expr.proxy))
# Generate expressions to compute the actual correlations.
corrs = [
self[col_name].corr(other_col, method)
for col_name, other_col in zip(col_names, other_cols)]
# Combine the results
def fill_dataframe(*args):
result = proxy.copy(deep=True)
for col, value in zip(proxy.index, args):
result[col] = value
return result
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'fill_dataframe',
fill_dataframe,
[corr._expr for corr in corrs],
requires_partition_by=partitionings.Singleton(),
proxy=proxy))
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
[docs] def dot(self, other):
# We want to broadcast the right hand side to all partitions of the left.
# This is OK, as its index must be the same size as the columns set of self,
# so cannot be too large.
class AsScalar(object):
def __init__(self, value):
self.value = value
if isinstance(other, frame_base.DeferredFrame):
proxy = other._expr.proxy()
with expressions.allow_non_parallel_operations():
side = expressions.ComputedExpression(
'as_scalar',
lambda df: AsScalar(df),
[other._expr],
requires_partition_by=partitionings.Singleton())
else:
proxy = pd.DataFrame(columns=range(len(other[0])))
side = expressions.ConstantExpression(AsScalar(other))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dot',
lambda left, right: left @ right.value,
[self._expr, side],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Index(),
proxy=proxy))
__matmul__ = dot
head = tail = frame_base.wont_implement_method('order-sensitive')
max = frame_base._agg_method('max')
min = frame_base._agg_method('min')
[docs] def mode(self, axis=0, *args, **kwargs):
if axis == 1 or axis == 'columns':
# Number of columns is max(number mode values for each row), so we can't
# determine how many there will be before looking at the data.
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'mode',
lambda df: df.mode(*args, **kwargs),
[self._expr],
#TODO(robertwb): Approximate?
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def dropna(self, axis, **kwargs):
# TODO(robertwb): This is a common pattern. Generalize?
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'dropna',
lambda df: df.dropna(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
'non-lazy')
def _cols_as_temporary_index(self, cols, suffix=''):
original_index_names = list(self._expr.proxy().index.names)
new_index_names = [
'__apache_beam_temp_%d_%s' % (ix, suffix)
for (ix, _) in enumerate(original_index_names)]
def reindex(df):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'reindex',
lambda df:
df.rename_axis(index=new_index_names, copy=False)
.reset_index().set_index(cols),
[df._expr],
preserves_partition_by=partitionings.Nothing(),
requires_partition_by=partitionings.Nothing()))
def revert(df):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_restoreindex',
lambda df:
df.reset_index().set_index(new_index_names)
.rename_axis(index=original_index_names, copy=False),
[df._expr],
preserves_partition_by=partitionings.Nothing(),
requires_partition_by=partitionings.Nothing()))
return reindex, revert
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def join(self, other, on, **kwargs):
if on is not None:
reindex, revert = self._cols_as_temporary_index(on)
return revert(reindex(self).join(other, **kwargs))
if isinstance(other, list):
other_is_list = True
else:
other = [other]
other_is_list = False
placeholder = object()
other_exprs = [
df._expr for df in other if isinstance(df, frame_base.DeferredFrame)]
const_others = [
placeholder if isinstance(df, frame_base.DeferredFrame) else df
for df in other]
def fill_placeholders(values):
values = iter(values)
filled = [
next(values) if df is placeholder else df for df in const_others]
if other_is_list:
return filled
else:
return filled[0]
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join',
lambda df, *deferred_others: df.join(
fill_placeholders(deferred_others), **kwargs),
[self._expr] + other_exprs,
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def merge(
self,
right,
on,
left_on,
right_on,
left_index,
right_index,
**kwargs):
self_proxy = self._expr.proxy()
right_proxy = right._expr.proxy()
# Validate with a pandas call.
_ = self_proxy.merge(
right_proxy,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
**kwargs)
if not any([on, left_on, right_on, left_index, right_index]):
on = [col for col in self_proxy.columns() if col in right_proxy.columns()]
if not left_on:
left_on = on
elif not isinstance(left_on, list):
left_on = [left_on]
if not right_on:
right_on = on
elif not isinstance(right_on, list):
right_on = [right_on]
if left_index:
indexed_left = self
else:
indexed_left = self.set_index(left_on, drop=False)
if right_index:
indexed_right = right
else:
indexed_right = right.set_index(right_on, drop=False)
merged = frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'merge',
lambda left, right: left.merge(
right, left_index=True, right_index=True, **kwargs),
[indexed_left._expr, indexed_right._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Index()))
if left_index or right_index:
return merged
else:
return merged.reset_index(drop=True)
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nlargest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nlargest-per-partition',
lambda df: df.nlargest(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nlargest',
lambda df: df.nlargest(**kwargs),
[per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def nsmallest(self, keep, **kwargs):
if keep == 'any':
keep = 'first'
elif keep != 'all':
raise frame_base.WontImplementError('order-sensitive')
kwargs['keep'] = keep
per_partition = expressions.ComputedExpression(
'nsmallest-per-partition',
lambda df: df.nsmallest(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
with expressions.allow_non_parallel_operations(True):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nsmallest',
lambda df: df.nsmallest(**kwargs),
[per_partition],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Singleton()))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
def nunique(self, **kwargs):
if kwargs.get('axis', None) in (1, 'columns'):
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'nunique',
lambda df: df.nunique(**kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
plot = property(frame_base.wont_implement_method('plot'))
[docs] def pop(self, item):
result = self[item]
self._expr = expressions.ComputedExpression(
'popped',
lambda df: (df.pop(item), df)[-1],
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing())
return result
prod = product = frame_base._agg_method('prod')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def quantile(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'quantile',
lambda df: df.quantile(axis=axis, **kwargs),
[self._expr],
#TODO(robertwb): Approximate quantiles?
requires_partition_by=partitionings.Singleton(),
preserves_partition_by=partitionings.Singleton()))
query = frame_base._elementwise_method('query')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.maybe_inplace
def rename(self, **kwargs):
rename_index = (
'index' in kwargs
or kwargs.get('axis', None) in (0, 'index')
or ('columns' not in kwargs and 'axis' not in kwargs))
rename_columns = (
'columns' in kwargs
or kwargs.get('axis', None) in (1, 'columns'))
if rename_index:
# Technically, it's still partitioned by index, but it's no longer
# partitioned by the hash of the index.
preserves_partition_by = partitionings.Nothing()
else:
preserves_partition_by = partitionings.Singleton()
if kwargs.get('errors', None) == 'raise' and rename_index:
# Renaming index with checking requires global index.
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
proxy = None
if rename_index:
# The proxy can't be computed by executing rename, it will error
# renaming the index.
if rename_columns:
# Note if both are being renamed, index and columns must be specified
# (not axis)
proxy = self._expr.proxy().rename(**{k: v for (k, v) in kwargs.items()
if not k == 'index'})
else:
# No change in columns, reuse proxy
proxy = self._expr.proxy()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'rename',
lambda df: df.rename(**kwargs),
[self._expr],
proxy=proxy,
preserves_partition_by=preserves_partition_by,
requires_partition_by=requires_partition_by))
rename_axis = frame_base._elementwise_method('rename_axis')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def replace(self, limit, **kwargs):
if limit is None:
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'replace',
lambda df: df.replace(limit=limit, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def reset_index(self, level=None, **kwargs):
if level is not None and not isinstance(level, (tuple, list)):
level = [level]
if level is None or len(level) == self._expr.proxy().index.nlevels:
# TODO: Could do distributed re-index with offsets.
requires_partition_by = partitionings.Singleton()
else:
requires_partition_by = partitionings.Nothing()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'reset_index',
lambda df: df.reset_index(level=level, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Nothing(),
requires_partition_by=requires_partition_by))
round = frame_base._elementwise_method('round')
select_dtypes = frame_base._elementwise_method('select_dtypes')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
def shift(self, axis, **kwargs):
if 'freq' in kwargs:
raise frame_base.WontImplementError('data-dependent')
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'shift',
lambda df: df.shift(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
@property
def shape(self):
raise frame_base.WontImplementError('scalar value')
[docs] @frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def sort_values(self, axis, **kwargs):
if axis == 1 or axis == 'columns':
requires_partition_by = partitionings.Nothing()
else:
requires_partition_by = partitionings.Singleton()
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'sort_values',
lambda df: df.sort_values(axis=axis, **kwargs),
[self._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=requires_partition_by))
stack = frame_base._elementwise_method('stack')
sum = frame_base._agg_method('sum')
take = frame_base.wont_implement_method('deprecated')
to_records = to_dict = to_numpy = to_string = (
frame_base.wont_implement_method('non-deferred value'))
to_sparse = to_string # frame_base._elementwise_method('to_sparse')
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
transpose = frame_base.wont_implement_method('non-deferred column values')
[docs] def unstack(self, *args, **kwargs):
if self._expr.proxy().index.nlevels == 1:
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'unstack',
lambda df: df.unstack(*args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Index()))
else:
raise frame_base.WontImplementError('non-deferred column values')
update = frame_base._proxy_method(
'update',
inplace=True,
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Index())
values = property(frame_base.wont_implement_method('non-deferred value'))
for io_func in dir(io):
if io_func.startswith('to_'):
setattr(DeferredDataFrame, io_func, getattr(io, io_func))
for meth in ('filter', ):
setattr(DeferredDataFrame, meth, frame_base._elementwise_method(meth))
[docs]@populate_not_implemented(pd.core.groupby.generic.DataFrameGroupBy)
class DeferredGroupBy(frame_base.DeferredFrame):
def __init__(self, expr, kwargs):
super(DeferredGroupBy, self).__init__(expr)
self._kwargs = kwargs
[docs] def agg(self, fn):
if not callable(fn):
# TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
# running doctests for pandas.core.groupby.generic
raise NotImplementedError('GroupBy.agg currently only supports callable '
'arguments')
return DeferredDataFrame(
expressions.ComputedExpression(
'agg',
lambda df: df.agg(fn), [self._expr],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()))
aggregate = agg
first = last = head = tail = frame_base.wont_implement_method(
'order sensitive')
# TODO(robertwb): Consider allowing this for categorical keys.
__len__ = frame_base.wont_implement_method('non-deferred')
__getitem__ = frame_base.not_implemented_method('__getitem__')
groups = property(frame_base.wont_implement_method('non-deferred'))
def _liftable_agg(meth, postagg_meth=None):
name, func = frame_base.name_and_func(meth)
if postagg_meth is None:
post_agg_name, post_agg_func = name, func
else:
post_agg_name, post_agg_func = frame_base.name_and_func(postagg_meth)
def wrapper(self, *args, **kwargs):
assert isinstance(self, DeferredGroupBy)
ungrouped = self._expr.args()[0]
to_group = ungrouped.proxy().index
is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
for i in range(to_group.nlevels))
groupby_kwargs = self._kwargs
# Don't include un-observed categorical values in the preagg
preagg_groupby_kwargs = groupby_kwargs.copy()
preagg_groupby_kwargs['observed'] = True
pre_agg = expressions.ComputedExpression(
'pre_combine_' + name,
lambda df: func(
df.groupby(level=list(range(df.index.nlevels)),
**preagg_groupby_kwargs),
**kwargs),
[ungrouped],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton())
post_agg = expressions.ComputedExpression(
'post_combine_' + post_agg_name,
lambda df: post_agg_func(
df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
**kwargs),
[pre_agg],
requires_partition_by=(partitionings.Singleton()
if is_categorical_grouping
else partitionings.Index()),
preserves_partition_by=partitionings.Singleton())
return frame_base.DeferredFrame.wrap(post_agg)
return wrapper
def _unliftable_agg(meth):
name, func = frame_base.name_and_func(meth)
def wrapper(self, *args, **kwargs):
assert isinstance(self, DeferredGroupBy)
ungrouped = self._expr.args()[0]
to_group = ungrouped.proxy().index
is_categorical_grouping = any(to_group.get_level_values(i).is_categorical()
for i in range(to_group.nlevels))
groupby_kwargs = self._kwargs
post_agg = expressions.ComputedExpression(
name,
lambda df: func(
df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
**kwargs),
[ungrouped],
requires_partition_by=(partitionings.Singleton()
if is_categorical_grouping
else partitionings.Index()),
preserves_partition_by=partitionings.Singleton())
return frame_base.DeferredFrame.wrap(post_agg)
return wrapper
LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
for meth in LIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _liftable_agg(meth))
for meth in LIFTABLE_WITH_SUM_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _liftable_agg(meth, postagg_meth='sum'))
for meth in UNLIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
def _is_associative(func):
return func in LIFTABLE_AGGREGATIONS or (
getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS
and func.__module__ in ('numpy', 'builtins'))
@populate_not_implemented(pd.core.groupby.generic.DataFrameGroupBy)
class _DeferredGroupByCols(frame_base.DeferredFrame):
# It's not clear that all of these make sense in Pandas either...
agg = aggregate = frame_base._elementwise_method('agg')
any = frame_base._elementwise_method('any')
all = frame_base._elementwise_method('all')
boxplot = frame_base.wont_implement_method('plot')
describe = frame_base.wont_implement_method('describe')
diff = frame_base._elementwise_method('diff')
fillna = frame_base._elementwise_method('fillna')
filter = frame_base._elementwise_method('filter')
first = frame_base.wont_implement_method('order sensitive')
get_group = frame_base._elementwise_method('group')
head = frame_base.wont_implement_method('order sensitive')
hist = frame_base.wont_implement_method('plot')
idxmax = frame_base._elementwise_method('idxmax')
idxmin = frame_base._elementwise_method('idxmin')
last = frame_base.wont_implement_method('order sensitive')
mad = frame_base._elementwise_method('mad')
max = frame_base._elementwise_method('max')
mean = frame_base._elementwise_method('mean')
median = frame_base._elementwise_method('median')
min = frame_base._elementwise_method('min')
nunique = frame_base._elementwise_method('nunique')
plot = frame_base.wont_implement_method('plot')
prod = frame_base._elementwise_method('prod')
quantile = frame_base._elementwise_method('quantile')
shift = frame_base._elementwise_method('shift')
size = frame_base._elementwise_method('size')
skew = frame_base._elementwise_method('skew')
std = frame_base._elementwise_method('std')
sum = frame_base._elementwise_method('sum')
tail = frame_base.wont_implement_method('order sensitive')
take = frame_base.wont_implement_method('deprectated')
tshift = frame_base._elementwise_method('tshift')
var = frame_base._elementwise_method('var')
@property
def groups(self):
return self._expr.proxy().groups
@property
def indices(self):
return self._expr.proxy().indices
@property
def ndim(self):
return self._expr.proxy().ndim
@property
def ngroups(self):
return self._expr.proxy().ngroups
@populate_not_implemented(pd.core.indexes.base.Index)
class _DeferredIndex(object):
def __init__(self, frame):
self._frame = frame
@property
def names(self):
return self._frame._expr.proxy().index.names
@property
def ndim(self):
return self._frame._expr.proxy().index.ndim
@property
def nlevels(self):
return self._frame._expr.proxy().index.nlevels
def __getattr__(self, name):
raise NotImplementedError('index.%s' % name)
@populate_not_implemented(pd.core.indexing._LocIndexer)
class _DeferredLoc(object):
def __init__(self, frame):
self._frame = frame
def __getitem__(self, index):
if isinstance(index, tuple):
rows, cols = index
return self[rows][cols]
elif isinstance(index, list) and index and isinstance(index[0], bool):
# Aligned by numerical index.
raise NotImplementedError(type(index))
elif isinstance(index, list):
# Select rows, but behaves poorly on missing values.
raise NotImplementedError(type(index))
elif isinstance(index, slice):
args = [self._frame._expr]
func = lambda df: df.loc[index]
elif isinstance(index, frame_base.DeferredFrame):
args = [self._frame._expr, index._expr]
func = lambda df, index: df.loc[index]
elif callable(index):
def checked_callable_index(df):
computed_index = index(df)
if isinstance(computed_index, tuple):
row_index, _ = computed_index
else:
row_index = computed_index
if isinstance(row_index, list) and row_index and isinstance(
row_index[0], bool):
raise NotImplementedError(type(row_index))
elif not isinstance(row_index, (slice, pd.Series)):
raise NotImplementedError(type(row_index))
return computed_index
args = [self._frame._expr]
func = lambda df: df.loc[checked_callable_index]
else:
raise NotImplementedError(type(index))
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'loc',
func,
args,
requires_partition_by=(
partitionings.Index()
if len(args) > 1
else partitionings.Nothing()),
preserves_partition_by=partitionings.Singleton()))
__setitem__ = frame_base.not_implemented_method('loc.setitem')
@populate_not_implemented(pd.core.indexing._iLocIndexer)
class _DeferredILoc(object):
def __init__(self, frame):
self._frame = frame
def __getitem__(self, index):
if isinstance(index, tuple):
rows, _ = index
if rows != slice(None, None, None):
raise frame_base.WontImplementError('order-sensitive')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'iloc',
lambda df: df.iloc[index],
[self._frame._expr],
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton()))
else:
raise frame_base.WontImplementError('order-sensitive')
__setitem__ = frame_base.wont_implement_method('iloc.setitem')
class _DeferredStringMethods(frame_base.DeferredBase):
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
@frame_base.populate_defaults(pd.core.strings.StringMethods)
def cat(self, others, join, **kwargs):
if others is None:
# Concatenate series into a single String
requires = partitionings.Singleton()
func = lambda df: df.str.cat(join=join, **kwargs)
args = [self._expr]
elif (isinstance(others, frame_base.DeferredBase) or
(isinstance(others, list) and
all(isinstance(other, frame_base.DeferredBase) for other in others))):
if join is None:
raise frame_base.WontImplementError("cat with others=Series or "
"others=List[Series] requires "
"join to be specified.")
if isinstance(others, frame_base.DeferredBase):
others = [others]
requires = partitionings.Index()
def func(*args):
return args[0].str.cat(others=args[1:], join=join, **kwargs)
args = [self._expr] + [other._expr for other in others]
else:
raise frame_base.WontImplementError("others must be None, Series, or "
"List[Series]. List[str] is not "
"supported.")
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'cat',
func,
args,
requires_partition_by=requires,
preserves_partition_by=partitionings.Singleton()))
@frame_base.args_to_kwargs(pd.core.strings.StringMethods)
def repeat(self, repeats):
if isinstance(repeats, int):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'repeat',
lambda series: series.str.repeat(repeats),
[self._expr],
# TODO(BEAM-11155): Defer to pandas to compute this proxy.
# Currently it incorrectly infers dtype bool, may require upstream
# fix.
proxy=self._expr.proxy(),
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Singleton()))
elif isinstance(repeats, frame_base.DeferredBase):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'repeat',
lambda series, repeats_series: series.str.repeat(repeats_series),
[self._expr, repeats._expr],
# TODO(BEAM-11155): Defer to pandas to compute this proxy.
# Currently it incorrectly infers dtype bool, may require upstream
# fix.
proxy=self._expr.proxy(),
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Singleton()))
elif isinstance(repeats, list):
raise frame_base.WontImplementError("repeats must be an integer or a "
"Series.")
ELEMENTWISE_STRING_METHODS = [
'capitalize',
'casefold',
'contains',
'count',
'endswith',
'extract',
'extractall',
'findall',
'fullmatch',
'get',
'get_dummies',
'isalnum',
'isalpha',
'isdecimal',
'isdigit',
'islower',
'isnumeric',
'isspace',
'istitle',
'isupper',
'join',
'len',
'lower',
'lstrip',
'match',
'pad',
'partition',
'replace',
'rpartition',
'rsplit',
'rstrip',
'slice',
'slice_replace',
'split',
'startswith',
'strip',
'swapcase',
'title',
'upper',
'wrap',
'zfill',
'__getitem__',
]
[docs]def make_str_func(method):
def func(df, *args, **kwargs):
return getattr(df.str, method)(*args, **kwargs)
return func
for method in ELEMENTWISE_STRING_METHODS:
setattr(_DeferredStringMethods,
method,
frame_base._elementwise_method(make_str_func(method)))
for base in ['add',
'sub',
'mul',
'div',
'truediv',
'floordiv',
'mod',
'divmod',
'pow',
'and',
'or']:
for p in ['%s', 'r%s', '__%s__', '__r%s__']:
# TODO: non-trivial level?
name = p % base
setattr(
DeferredSeries,
name,
frame_base._elementwise_method(name, restrictions={'level': None}))
setattr(
DeferredDataFrame,
name,
frame_base._elementwise_method(name, restrictions={'level': None}))
setattr(
DeferredSeries,
'__i%s__' % base,
frame_base._elementwise_method('__i%s__' % base, inplace=True))
setattr(
DeferredDataFrame,
'__i%s__' % base,
frame_base._elementwise_method('__i%s__' % base, inplace=True))
for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
for name in ['__neg__', '__pos__', '__invert__']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
DeferredSeries.multiply = DeferredSeries.mul # type: ignore
DeferredDataFrame.multiply = DeferredDataFrame.mul # type: ignore
def _slice_parts(s):
yield s.start
yield s.stop
yield s.step
def _is_null_slice(s):
return isinstance(s, slice) and all(x is None for x in _slice_parts(s))
def _is_integer_slice(s):
return isinstance(s, slice) and all(
x is None or isinstance(x, int)
for x in _slice_parts(s)) and not _is_null_slice(s)