#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from collections.abc import Iterable
from typing import Any
from typing import TypeVar
import numpy as np
import pandas as pd
Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
[docs]
class Partitioning(object):
"""A class representing a (consistent) partitioning of dataframe objects.
"""
def __repr__(self):
return self.__class__.__name__
[docs]
def is_subpartitioning_of(self, other: 'Partitioning') -> bool:
"""Returns whether self is a sub-partition of other.
Specifically, returns whether something partitioned by self is necissarily
also partitioned by other.
"""
raise NotImplementedError
def __lt__(self, other):
return self != other and self <= other
def __le__(self, other):
return not self.is_subpartitioning_of(other)
[docs]
def partition_fn(self, df: Frame,
num_partitions: int) -> Iterable[tuple[Any, Frame]]:
"""A callable that actually performs the partitioning of a Frame df.
This will be invoked via a FlatMap in conjunction with a GroupKey to
achieve the desired partitioning.
"""
raise NotImplementedError
[docs]
def test_partition_fn(self, df):
return self.partition_fn(df, 5)
[docs]
class Index(Partitioning):
"""A partitioning by index (either fully or partially).
If the set of "levels" of the index to consider is not specified, the entire
index is used.
These form a partial order, given by
Singleton() < Index([i]) < Index([i, j]) < ... < Index() < Arbitrary()
The ordering is implemented via the is_subpartitioning_of method, where the
examples on the right are subpartitionings of the examples on the left above.
"""
def __init__(self, levels=None):
self._levels = levels
def __repr__(self):
if self._levels:
return 'Index%s' % self._levels
else:
return 'Index'
def __eq__(self, other):
return type(self) == type(other) and self._levels == other._levels
def __hash__(self):
if self._levels:
return hash(tuple(sorted(self._levels)))
else:
return hash(type(self))
[docs]
def is_subpartitioning_of(self, other):
if isinstance(other, Singleton):
return True
elif isinstance(other, Index):
if self._levels is None:
return True
elif other._levels is None:
return False
else:
return all(level in self._levels for level in other._levels)
elif isinstance(other, (Arbitrary, JoinIndex)):
return False
else:
raise ValueError(f"Encountered unknown type {other!r}")
def _hash_index(self, df):
if self._levels is None:
levels = list(range(df.index.nlevels))
else:
levels = self._levels
return sum(
pd.util.hash_array(np.asarray(df.index.get_level_values(level)))
for level in levels)
[docs]
def partition_fn(self, df, num_partitions):
hashes = self._hash_index(df)
for key in range(num_partitions):
yield key, df[hashes % num_partitions == key]
[docs]
def check(self, dfs):
# Drop empty DataFrames
dfs = [df for df in dfs if len(df)]
if not len(dfs):
return True
def apply_consistent_order(dfs):
# Apply consistent order between dataframes by using sum of the index's
# hash.
# Apply consistent order within dataframe with sort_index()
# Also drops any empty dataframes.
return sorted((df.sort_index() for df in dfs if len(df)),
key=lambda df: sum(self._hash_index(df)))
dfs = apply_consistent_order(dfs)
repartitioned_dfs = apply_consistent_order(
df for _, df in self.test_partition_fn(pd.concat(dfs)))
# Assert that each index is identical
for df, repartitioned_df in zip(dfs, repartitioned_dfs):
if not df.index.equals(repartitioned_df.index):
return False
return True
[docs]
class Singleton(Partitioning):
"""A partitioning of all the data into a single partition.
"""
def __init__(self, reason=None):
self._reason = reason
@property
def reason(self):
return self._reason
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
[docs]
def is_subpartitioning_of(self, other):
return isinstance(other, Singleton)
[docs]
def partition_fn(self, df, num_partitions):
yield None, df
[docs]
def check(self, dfs):
return len(dfs) <= 1
[docs]
class JoinIndex(Partitioning):
"""A partitioning that lets two frames be joined.
This can either be a hash partitioning on the full index, or a common
ancestor with no intervening re-indexing/re-partitioning.
It fits into the partial ordering as
Index() < JoinIndex(x) < JoinIndex() < Arbitrary()
with
JoinIndex(x) and JoinIndex(y)
being incomparable for nontrivial x != y.
Expressions desiring to make use of this index should simply declare a
requirement of JoinIndex().
"""
def __init__(self, ancestor=None):
self._ancestor = ancestor
def __repr__(self):
if self._ancestor:
return 'JoinIndex[%s]' % self._ancestor
else:
return 'JoinIndex'
def __eq__(self, other):
if type(self) != type(other):
return False
elif self._ancestor is None:
return other._ancestor is None
elif other._ancestor is None:
return False
else:
return self._ancestor == other._ancestor
def __hash__(self):
return hash((type(self), self._ancestor))
[docs]
def is_subpartitioning_of(self, other):
if isinstance(other, Arbitrary):
return False
elif isinstance(other, JoinIndex):
return self._ancestor is None or self == other
else:
return True
[docs]
def test_partition_fn(self, df):
return Index().test_partition_fn(df)
[docs]
def check(self, dfs):
return True
[docs]
class Arbitrary(Partitioning):
"""A partitioning imposing no constraints on the actual partitioning.
"""
def __eq__(self, other):
return type(self) == type(other)
def __hash__(self):
return hash(type(self))
[docs]
def is_subpartitioning_of(self, other):
return True
[docs]
def test_partition_fn(self, df):
num_partitions = 10
def shuffled(seq):
seq = list(seq)
random.shuffle(seq)
return seq
part = pd.Series(shuffled(range(len(df))), index=df.index) % num_partitions
for k in range(num_partitions):
yield k, df[part == k]
[docs]
def check(self, dfs):
return True