#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Utilities for relating schema-aware PCollections and dataframe transforms.
Imposes a mapping between native Python typings (specifically those compatible
with :mod:`apache_beam.typehints.schemas`), and common pandas dtypes::
pandas dtype Python typing
np.int{8,16,32,64} <-----> np.int{8,16,32,64}*
pd.Int{8,16,32,64}Dtype <-----> Optional[np.int{8,16,32,64}]*
np.float{32,64} <-----> Optional[np.float{32,64}]
\--- np.float{32,64}
Not supported <------ Optional[bytes]
np.bool <-----> np.bool
np.dtype('S') <-----> bytes
pd.BooleanDType() <-----> Optional[bool]
pd.StringDType() <-----> Optional[str]
\--- str
np.object <-----> Any
* int, float, bool are treated the same as np.int64, np.float64, np.bool
Note that when converting to pandas dtypes, any types not specified here are
shunted to ``np.object``.
Similarly when converting from pandas to Python types, types that aren't
otherwise specified here are shunted to ``Any``. Notably, this includes
``np.datetime64``.
Pandas does not support hierarchical data natively. Currently, all structured
types (``Sequence``, ``Mapping``, nested ``NamedTuple`` types), are
shunted to ``np.object`` like all other unknown types. In the future these
types may be given special consideration.
"""
# pytype: skip-file
from typing import Any
from typing import NamedTuple
from typing import Optional
from typing import TypeVar
from typing import Union
import numpy as np
import pandas as pd
import apache_beam as beam
from apache_beam import typehints
from apache_beam.portability.api import schema_pb2
from apache_beam.transforms.util import BatchElements
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.utils import proto_utils
__all__ = (
'BatchRowsAsDataFrame',
'generate_proxy',
'UnbatchPandas',
'element_type_from_dataframe')
T = TypeVar('T', bound=NamedTuple)
# Generate type map (presented visually in the docstring)
_BIDIRECTIONAL = [
(bool, bool),
(np.int8, np.int8),
(np.int16, np.int16),
(np.int32, np.int32),
(np.int64, np.int64),
(pd.Int8Dtype(), Optional[np.int8]),
(pd.Int16Dtype(), Optional[np.int16]),
(pd.Int32Dtype(), Optional[np.int32]),
(pd.Int64Dtype(), Optional[np.int64]),
(np.float32, Optional[np.float32]),
(np.float64, Optional[np.float64]),
(object, Any),
(pd.StringDtype(), Optional[str]),
(pd.BooleanDtype(), Optional[bool]),
]
PANDAS_TO_BEAM = {
pd.Series([], dtype=dtype).dtype: fieldtype
for dtype,
fieldtype in _BIDIRECTIONAL
}
BEAM_TO_PANDAS = {fieldtype: dtype for dtype, fieldtype in _BIDIRECTIONAL}
# Shunt non-nullable Beam types to the same pandas types as their non-nullable
# equivalents for FLOATs, DOUBLEs, and STRINGs. pandas has no non-nullable dtype
# for these.
OPTIONAL_SHUNTS = [np.float32, np.float64, str]
for typehint in OPTIONAL_SHUNTS:
BEAM_TO_PANDAS[typehint] = BEAM_TO_PANDAS[Optional[typehint]]
# int, float -> int64, np.float64
BEAM_TO_PANDAS[int] = BEAM_TO_PANDAS[np.int64]
BEAM_TO_PANDAS[Optional[int]] = BEAM_TO_PANDAS[Optional[np.int64]]
BEAM_TO_PANDAS[float] = BEAM_TO_PANDAS[np.float64]
BEAM_TO_PANDAS[Optional[float]] = BEAM_TO_PANDAS[Optional[np.float64]]
BEAM_TO_PANDAS[bytes] = 'bytes'
[docs]@typehints.with_input_types(T)
@typehints.with_output_types(pd.DataFrame)
class BatchRowsAsDataFrame(beam.PTransform):
"""A transform that batches schema-aware PCollection elements into DataFrames
Batching parameters are inherited from
:class:`~apache_beam.transforms.util.BatchElements`.
"""
def __init__(self, *args, proxy=None, **kwargs):
self._batch_elements_transform = BatchElements(*args, **kwargs)
self._proxy = proxy
[docs] def expand(self, pcoll):
proxy = generate_proxy(
pcoll.element_type) if self._proxy is None else self._proxy
if isinstance(proxy, pd.DataFrame):
columns = proxy.columns
construct = lambda batch: pd.DataFrame.from_records(
batch, columns=columns)
elif isinstance(proxy, pd.Series):
dtype = proxy.dtype
construct = lambda batch: pd.Series(batch, dtype=dtype)
else:
raise NotImplementedError("Unknown proxy type: %s" % proxy)
return pcoll | self._batch_elements_transform | beam.Map(construct)
[docs]def generate_proxy(element_type):
# type: (type) -> pd.DataFrame
"""Generate a proxy pandas object for the given PCollection element_type.
Currently only supports generating a DataFrame proxy from a schema-aware
PCollection or a Series proxy from a primitively typed PCollection.
"""
if element_type != Any and element_type in BEAM_TO_PANDAS:
return pd.Series(dtype=BEAM_TO_PANDAS[element_type])
else:
fields = named_fields_from_element_type(element_type)
proxy = pd.DataFrame(columns=[name for name, _ in fields])
for name, typehint in fields:
# Default to np.object. This is lossy, we won't be able to recover
# the type at the output.
dtype = BEAM_TO_PANDAS.get(typehint, object)
proxy[name] = proxy[name].astype(dtype)
return proxy
[docs]def element_type_from_dataframe(proxy, include_indexes=False):
# type: (pd.DataFrame, bool) -> type
"""Generate an element_type for an element-wise PCollection from a proxy
pandas object. Currently only supports converting the element_type for
a schema-aware PCollection to a proxy DataFrame.
Currently only supports generating a DataFrame proxy from a schema-aware
PCollection.
"""
output_columns = []
if include_indexes:
remaining_index_names = list(proxy.index.names)
i = 0
while len(remaining_index_names):
index_name = remaining_index_names.pop(0)
if index_name is None:
raise ValueError(
"Encountered an unnamed index. Cannot convert to a "
"schema-aware PCollection with include_indexes=True. "
"Please name all indexes or consider not including "
"indexes.")
elif index_name in remaining_index_names:
raise ValueError(
"Encountered multiple indexes with the name '%s'. "
"Cannot convert to a schema-aware PCollection with "
"include_indexes=True. Please ensure all indexes have "
"unique names or consider not including indexes." % index_name)
elif index_name in proxy.columns:
raise ValueError(
"Encountered an index that has the same name as one "
"of the columns, '%s'. Cannot convert to a "
"schema-aware PCollection with include_indexes=True. "
"Please ensure all indexes have unique names or "
"consider not including indexes." % index_name)
else:
# its ok!
output_columns.append(
(index_name, proxy.index.get_level_values(i).dtype))
i += 1
output_columns.extend(zip(proxy.columns, proxy.dtypes))
return named_tuple_from_schema(
named_fields_to_schema([(column, _dtype_to_fieldtype(dtype))
for (column, dtype) in output_columns]))
class _BaseDataframeUnbatchDoFn(beam.DoFn):
def __init__(self, namedtuple_ctor):
self._namedtuple_ctor = namedtuple_ctor
def _get_series(self, df):
raise NotImplementedError()
def process(self, df):
# TODO: Only do null checks for nullable types
def make_null_checking_generator(series):
nulls = pd.isnull(series)
return (None if isnull else value for isnull, value in zip(nulls, series))
all_series = self._get_series(df)
iterators = [
make_null_checking_generator(series) for series,
typehint in zip(all_series, self._namedtuple_ctor.__annotations__)
]
# TODO: Avoid materializing the rows. Produce an object that references the
# underlying dataframe
for values in zip(*iterators):
yield self._namedtuple_ctor(*values)
def infer_output_type(self, input_type):
return self._namedtuple_ctor
@classmethod
def _from_serialized_schema(cls, schema_str):
return cls(
named_tuple_from_schema(
proto_utils.parse_Bytes(schema_str, schema_pb2.Schema)))
def __reduce__(self):
# when pickling, use bytes representation of the schema.
return (
self._from_serialized_schema,
(named_tuple_to_schema(self._namedtuple_ctor).SerializeToString(), ))
class _UnbatchNoIndex(_BaseDataframeUnbatchDoFn):
def _get_series(self, df):
return [df[column] for column in df.columns]
class _UnbatchWithIndex(_BaseDataframeUnbatchDoFn):
def _get_series(self, df):
return [df.index.get_level_values(i) for i in range(len(df.index.names))
] + [df[column] for column in df.columns]
def _unbatch_transform(proxy, include_indexes):
if isinstance(proxy, pd.DataFrame):
ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes)
return beam.ParDo(
_UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)
).with_output_types(ctor)
elif isinstance(proxy, pd.Series):
# Raise a TypeError if proxy has an unknown type
output_type = _dtype_to_fieldtype(proxy.dtype)
# TODO: Should the index ever be included for a Series?
if _match_is_optional(output_type):
def unbatch(series):
for isnull, value in zip(pd.isnull(series), series):
yield None if isnull else value
else:
def unbatch(series):
yield from series
return beam.FlatMap(unbatch).with_output_types(output_type)
# TODO: What about scalar inputs?
else:
raise TypeError(
"Proxy '%s' has unsupported type '%s'" % (proxy, type(proxy)))
def _dtype_to_fieldtype(dtype):
fieldtype = PANDAS_TO_BEAM.get(dtype)
if fieldtype is not None:
return fieldtype
elif dtype.kind == 'S':
return bytes
else:
return Any
[docs]@typehints.with_input_types(Union[pd.DataFrame, pd.Series])
class UnbatchPandas(beam.PTransform):
"""A transform that explodes a PCollection of DataFrame or Series. DataFrame
is converterd to a schema-aware PCollection, while Series is converted to its
underlying type.
Args:
include_indexes: (optional, default: False) When unbatching a DataFrame
if include_indexes=True, attempt to include index columns in the output
schema for expanded DataFrames. Raises an error if any of the index
levels are unnamed (name=None), or if any of the names are not unique
among all column and index names.
"""
def __init__(self, proxy, include_indexes=False):
self._proxy = proxy
self._include_indexes = include_indexes
[docs] def expand(self, pcoll):
return pcoll | _unbatch_transform(self._proxy, self._include_indexes)