Source code for apache_beam.dataframe.schemas

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

r"""Utilities for relating schema-aware PCollections and dataframe transforms.

Imposes a mapping between native Python typings (specifically those compatible
with :mod:`apache_beam.typehints.schemas`), and common pandas dtypes::

  pandas dtype                    Python typing{8,16,32,64}      <----->{8,16,32,64}*
  pd.Int{8,16,32,64}Dtype <-----> Optional[{8,16,32,64}]*
  np.float{32,64}         <-----> Optional[np.float{32,64}]
                             \--- np.float{32,64}
  Not supported           <------ Optional[bytes]
  np.bool                 <-----> np.bool
  np.dtype('S')           <-----> bytes
  pd.BooleanDType()       <-----> Optional[bool]
  pd.StringDType()        <-----> Optional[str]
                             \--- str
  np.object               <-----> Any

  * int, float, bool are treated the same as np.int64, np.float64, np.bool

Note that when converting to pandas dtypes, any types not specified here are
shunted to ``np.object``.

Similarly when converting from pandas to Python types, types that aren't
otherwise specified here are shunted to ``Any``. Notably, this includes

Pandas does not support hierarchical data natively. Currently, all structured
types (``Sequence``, ``Mapping``, nested ``NamedTuple`` types), are
shunted to ``np.object`` like all other unknown types. In the future these
types may be given special consideration.

# pytype: skip-file

from typing import Any
from typing import NamedTuple
from typing import Optional
from typing import TypeVar
from typing import Union

import numpy as np
import pandas as pd

import apache_beam as beam
from apache_beam import typehints
from apache_beam.portability.api import schema_pb2
from apache_beam.transforms.util import BatchElements
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.utils import proto_utils

__all__ = (

T = TypeVar('T', bound=NamedTuple)

# Generate type map (presented visually in the docstring)
    (bool, bool),
    (np.int8, np.int8),
    (np.int16, np.int16),
    (np.int32, np.int32),
    (np.int64, np.int64),
    (pd.Int8Dtype(), Optional[np.int8]),
    (pd.Int16Dtype(), Optional[np.int16]),
    (pd.Int32Dtype(), Optional[np.int32]),
    (pd.Int64Dtype(), Optional[np.int64]),
    (np.float32, Optional[np.float32]),
    (np.float64, Optional[np.float64]),
    (object, Any),
    (pd.StringDtype(), Optional[str]),
    (pd.BooleanDtype(), Optional[bool]),

    pd.Series([], dtype=dtype).dtype: fieldtype
    for dtype,
    fieldtype in _BIDIRECTIONAL
BEAM_TO_PANDAS = {fieldtype: dtype for dtype, fieldtype in _BIDIRECTIONAL}

# Shunt non-nullable Beam types to the same pandas types as their non-nullable
# equivalents for FLOATs, DOUBLEs, and STRINGs. pandas has no non-nullable dtype
# for these.
OPTIONAL_SHUNTS = [np.float32, np.float64, str]

for typehint in OPTIONAL_SHUNTS:
  BEAM_TO_PANDAS[typehint] = BEAM_TO_PANDAS[Optional[typehint]]

# int, float -> int64, np.float64
BEAM_TO_PANDAS[Optional[int]] = BEAM_TO_PANDAS[Optional[np.int64]]
BEAM_TO_PANDAS[float] = BEAM_TO_PANDAS[np.float64]
BEAM_TO_PANDAS[Optional[float]] = BEAM_TO_PANDAS[Optional[np.float64]]

BEAM_TO_PANDAS[bytes] = 'bytes'

[docs]@typehints.with_input_types(T) @typehints.with_output_types(pd.DataFrame) class BatchRowsAsDataFrame(beam.PTransform): """A transform that batches schema-aware PCollection elements into DataFrames Batching parameters are inherited from :class:`~apache_beam.transforms.util.BatchElements`. """ def __init__(self, *args, proxy=None, **kwargs): self._batch_elements_transform = BatchElements(*args, **kwargs) self._proxy = proxy
[docs] def expand(self, pcoll): proxy = generate_proxy( pcoll.element_type) if self._proxy is None else self._proxy if isinstance(proxy, pd.DataFrame): columns = proxy.columns construct = lambda batch: pd.DataFrame.from_records( batch, columns=columns) elif isinstance(proxy, pd.Series): dtype = proxy.dtype construct = lambda batch: pd.Series(batch, dtype=dtype) else: raise NotImplementedError("Unknown proxy type: %s" % proxy) return pcoll | self._batch_elements_transform | beam.Map(construct)
[docs]def generate_proxy(element_type): # type: (type) -> pd.DataFrame """Generate a proxy pandas object for the given PCollection element_type. Currently only supports generating a DataFrame proxy from a schema-aware PCollection or a Series proxy from a primitively typed PCollection. """ if element_type != Any and element_type in BEAM_TO_PANDAS: return pd.Series(dtype=BEAM_TO_PANDAS[element_type]) else: fields = named_fields_from_element_type(element_type) proxy = pd.DataFrame(columns=[name for name, _ in fields]) for name, typehint in fields: # Default to np.object. This is lossy, we won't be able to recover # the type at the output. dtype = BEAM_TO_PANDAS.get(typehint, object) proxy[name] = proxy[name].astype(dtype) return proxy
[docs]def element_type_from_dataframe(proxy, include_indexes=False): # type: (pd.DataFrame, bool) -> type """Generate an element_type for an element-wise PCollection from a proxy pandas object. Currently only supports converting the element_type for a schema-aware PCollection to a proxy DataFrame. Currently only supports generating a DataFrame proxy from a schema-aware PCollection. """ output_columns = [] if include_indexes: remaining_index_names = list(proxy.index.names) i = 0 while len(remaining_index_names): index_name = remaining_index_names.pop(0) if index_name is None: raise ValueError( "Encountered an unnamed index. Cannot convert to a " "schema-aware PCollection with include_indexes=True. " "Please name all indexes or consider not including " "indexes.") elif index_name in remaining_index_names: raise ValueError( "Encountered multiple indexes with the name '%s'. " "Cannot convert to a schema-aware PCollection with " "include_indexes=True. Please ensure all indexes have " "unique names or consider not including indexes." % index_name) elif index_name in proxy.columns: raise ValueError( "Encountered an index that has the same name as one " "of the columns, '%s'. Cannot convert to a " "schema-aware PCollection with include_indexes=True. " "Please ensure all indexes have unique names or " "consider not including indexes." % index_name) else: # its ok! output_columns.append( (index_name, proxy.index.get_level_values(i).dtype)) i += 1 output_columns.extend(zip(proxy.columns, proxy.dtypes)) return named_tuple_from_schema( named_fields_to_schema([(column, _dtype_to_fieldtype(dtype)) for (column, dtype) in output_columns]))
class _BaseDataframeUnbatchDoFn(beam.DoFn): def __init__(self, namedtuple_ctor): self._namedtuple_ctor = namedtuple_ctor def _get_series(self, df): raise NotImplementedError() def process(self, df): # TODO: Only do null checks for nullable types def make_null_checking_generator(series): nulls = pd.isnull(series) return (None if isnull else value for isnull, value in zip(nulls, series)) all_series = self._get_series(df) iterators = [ make_null_checking_generator(series) for series, typehint in zip(all_series, self._namedtuple_ctor.__annotations__) ] # TODO: Avoid materializing the rows. Produce an object that references the # underlying dataframe for values in zip(*iterators): yield self._namedtuple_ctor(*values) def infer_output_type(self, input_type): return self._namedtuple_ctor @classmethod def _from_serialized_schema(cls, schema_str): return cls( named_tuple_from_schema( proto_utils.parse_Bytes(schema_str, schema_pb2.Schema))) def __reduce__(self): # when pickling, use bytes representation of the schema. return ( self._from_serialized_schema, (named_tuple_to_schema(self._namedtuple_ctor).SerializeToString(), )) class _UnbatchNoIndex(_BaseDataframeUnbatchDoFn): def _get_series(self, df): return [df[column] for column in df.columns] class _UnbatchWithIndex(_BaseDataframeUnbatchDoFn): def _get_series(self, df): return [df.index.get_level_values(i) for i in range(len(df.index.names)) ] + [df[column] for column in df.columns] def _unbatch_transform(proxy, include_indexes): if isinstance(proxy, pd.DataFrame): ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes) return beam.ParDo( _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor) ).with_output_types(ctor) elif isinstance(proxy, pd.Series): # Raise a TypeError if proxy has an unknown type output_type = _dtype_to_fieldtype(proxy.dtype) # TODO: Should the index ever be included for a Series? if _match_is_optional(output_type): def unbatch(series): for isnull, value in zip(pd.isnull(series), series): yield None if isnull else value else: def unbatch(series): yield from series return beam.FlatMap(unbatch).with_output_types(output_type) # TODO: What about scalar inputs? else: raise TypeError( "Proxy '%s' has unsupported type '%s'" % (proxy, type(proxy))) def _dtype_to_fieldtype(dtype): fieldtype = PANDAS_TO_BEAM.get(dtype) if fieldtype is not None: return fieldtype elif dtype.kind == 'S': return bytes else: return Any
[docs]@typehints.with_input_types(Union[pd.DataFrame, pd.Series]) class UnbatchPandas(beam.PTransform): """A transform that explodes a PCollection of DataFrame or Series. DataFrame is converterd to a schema-aware PCollection, while Series is converted to its underlying type. Args: include_indexes: (optional, default: False) When unbatching a DataFrame if include_indexes=True, attempt to include index columns in the output schema for expanded DataFrames. Raises an error if any of the index levels are unnamed (name=None), or if any of the names are not unique among all column and index names. """ def __init__(self, proxy, include_indexes=False): self._proxy = proxy self._include_indexes = include_indexes
[docs] def expand(self, pcoll): return pcoll | _unbatch_transform(self._proxy, self._include_indexes)