Source code for apache_beam.dataframe.pandas_top_level_functions

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module providing various functionality from the top-level pandas namespace.
"""

import re
from typing import Mapping

import pandas as pd

from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import partitionings


def _call_on_first_arg(name):
  def wrapper(target, *args, **kwargs):
    if isinstance(target, frame_base.DeferredBase):
      return getattr(target, name)(*args, **kwargs)
    else:
      return getattr(pd, name)(target, *args, **kwargs)

  return staticmethod(wrapper)


def _maybe_wrap_constant_expr(res):
  if type(res) in frame_base.DeferredBase._pandas_type_map.keys():
    return frame_base.DeferredBase.wrap(
        expressions.ConstantExpression(res, res[0:0]))
  else:
    return res


def _defer_to_pandas(name):
  func = getattr(pd, name)

  def wrapper(*args, **kwargs):
    res = func(*args, **kwargs)
    return _maybe_wrap_constant_expr(res)

  return staticmethod(wrapper)


def _defer_to_pandas_maybe_elementwise(name):
  """ Same as _defer_to_pandas, except it handles DeferredBase args, assuming
  the function can be processed elementwise. """
  func = getattr(pd, name)

  def wrapper(*args, **kwargs):
    if any(isinstance(arg, frame_base.DeferredBase)
           for arg in args + tuple(kwargs.values())):
      return frame_base._elementwise_function(func, name)(*args, **kwargs)

    res = func(*args, **kwargs)
    return _maybe_wrap_constant_expr(res)

  return staticmethod(wrapper)


def _is_top_level_function(o):
  return (
      callable(o) and not isinstance(o, type) and hasattr(o, '__name__') and
      re.match('[a-z].*', o.__name__))


[docs]class DeferredPandasModule(object): array = _defer_to_pandas('array') bdate_range = _defer_to_pandas('bdate_range')
[docs] @staticmethod @frame_base.args_to_kwargs(pd) @frame_base.populate_defaults(pd) def concat( objs, axis, join, ignore_index, keys, levels, names, verify_integrity, sort, copy): if ignore_index: raise NotImplementedError('concat(ignore_index)') if levels: raise NotImplementedError('concat(levels)') if isinstance(objs, Mapping): if keys is None: keys = list(objs.keys()) objs = [objs[k] for k in keys] else: objs = list(objs) if keys is None: preserves_partitioning = partitionings.Arbitrary() else: # Index 0 will be a new index for keys, only partitioning by the original # indexes (1 to N) will be preserved. nlevels = min(o._expr.proxy().index.nlevels for o in objs) preserves_partitioning = partitionings.Index( [i for i in range(1, nlevels + 1)]) deferred_none = expressions.ConstantExpression(None) exprs = [deferred_none if o is None else o._expr for o in objs] if axis in (1, 'columns'): required_partitioning = partitionings.Index() elif verify_integrity: required_partitioning = partitionings.Index() else: required_partitioning = partitionings.Arbitrary() return frame_base.DeferredBase.wrap( expressions.ComputedExpression( 'concat', lambda *objs: pd.concat( objs, axis=axis, join=join, ignore_index=ignore_index, keys=keys, levels=levels, names=names, verify_integrity=verify_integrity), # yapf break exprs, requires_partition_by=required_partitioning, preserves_partition_by=preserves_partitioning))
date_range = _defer_to_pandas('date_range') describe_option = _defer_to_pandas('describe_option') factorize = _call_on_first_arg('factorize') get_option = _defer_to_pandas('get_option') interval_range = _defer_to_pandas('interval_range') isna = _call_on_first_arg('isna') isnull = _call_on_first_arg('isnull') json_normalize = _defer_to_pandas('json_normalize') melt = _call_on_first_arg('melt') merge = _call_on_first_arg('merge') melt = _call_on_first_arg('melt') merge_ordered = frame_base.wont_implement_method( pd, 'merge_ordered', reason='order-sensitive') notna = _call_on_first_arg('notna') notnull = _call_on_first_arg('notnull') option_context = _defer_to_pandas('option_context') period_range = _defer_to_pandas('period_range') pivot = _call_on_first_arg('pivot') pivot_table = _call_on_first_arg('pivot_table') show_versions = _defer_to_pandas('show_versions') test = frame_base.wont_implement_method( pd, 'test', explanation="because it is an internal pandas testing utility.") timedelta_range = _defer_to_pandas('timedelta_range') to_pickle = frame_base.wont_implement_method( pd, 'to_pickle', reason='order-sensitive') to_datetime = _defer_to_pandas_maybe_elementwise('to_datetime') notna = _call_on_first_arg('notna') def __getattr__(self, name): if name.startswith('read_'): def func(*args, **kwargs): raise frame_base.WontImplementError( 'Use p | apache_beam.dataframe.io.%s' % name) return func res = getattr(pd, name) if _is_top_level_function(res): return frame_base.not_implemented_method(name, base_type=pd) else: return res
pd_wrapper = DeferredPandasModule()