Source code for apache_beam.typehints.decorators

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Type hinting decorators allowing static or runtime type-checking for the SDK.

This module defines decorators which utilize the type-hints defined in
'' to allow annotation of the types of function arguments and
return values.

Type-hints for functions are annotated using two separate decorators. One is for
type-hinting the types of function arguments, the other for type-hinting the
function return value. Type-hints can either be specified in the form of
positional arguments::

  @with_input_types(int, int)
  def add(a, b):
    return a + b

Keyword arguments::

  @with_input_types(a=int, b=int)
  def add(a, b):
    return a + b

Or even a mix of both::

  @with_input_types(int, b=int)
  def add(a, b):
    return a + b

Example usage for type-hinting arguments only::

  def to_lower(a):
    return a.lower()

Example usage for type-hinting return values only::

  @with_output_types(Tuple[int, bool])
  def compress_point(ec_point):
    return ec_point.x, ec_point.y < 0

Example usage for type-hinting both arguments and return values::

  def int_to_str(a):
    return str(a)

Type-hinting a function with arguments that unpack tuples are also supported
(in Python 2 only). As an example, such a function would be defined as::

  def foo((a, b)):

The valid type-hint for such as function looks like the following::

  @with_input_types(a=int, b=int)
  def foo((a, b)):

Notice that we hint the type of each unpacked argument independently, rather
than hinting the type of the tuple as a whole (Tuple[int, int]).

Optionally, type-hints can be type-checked at runtime. To toggle this behavior
this module defines two functions: 'enable_run_time_type_checking' and
'disable_run_time_type_checking'. NOTE: for this toggle behavior to work
properly it must appear at the top of the module where all functions are
defined, or before importing a module containing type-hinted functions.

from __future__ import absolute_import

import inspect
import logging
import sys
import types
from builtins import next
from builtins import object
from builtins import zip

from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints import typehints
from apache_beam.typehints.typehints import CompositeTypeHintError
from apache_beam.typehints.typehints import SimpleTypeHintError
from apache_beam.typehints.typehints import check_constraint
from apache_beam.typehints.typehints import validate_composite_type_param

  import funcsigs  # Python 2 only.
except ImportError:
  funcsigs = None

__all__ = [

# This is missing in the builtin types module.  str.upper is arbitrary, any
# method on a C-implemented type will do.
# pylint: disable=invalid-name
_MethodDescriptorType = type(str.upper)
# pylint: enable=invalid-name

_ANY_VAR_POSITIONAL = typehints.Tuple[typehints.Any, ...]
_ANY_VAR_KEYWORD = typehints.Dict[typehints.Any, typehints.Any]
# TODO(BEAM-8280): Remove this when from_callable is ready to be enabled.
_enable_from_callable = False

  _original_getfullargspec = inspect.getfullargspec
  _use_full_argspec = True
except AttributeError:  # Python 2
  _original_getfullargspec = inspect.getargspec
  _use_full_argspec = False

def getfullargspec(func):
  # Python 3: Use get_signature instead.
  assert sys.version_info < (3,), 'This method should not be used in Python 3'
    return _original_getfullargspec(func)
  except TypeError:
    if isinstance(func, type):
      argspec = getfullargspec(func.__init__)
      del argspec.args[0]
      return argspec
    elif callable(func):
        return _original_getfullargspec(func.__call__)
      except TypeError:
        # Return an ArgSpec with at least one positional argument,
        # and any number of other (positional or keyword) arguments
        # whose name won't match any real argument.
        # Arguments with the %unknown% prefix will be ignored in the type
        # checking code.
        if _use_full_argspec:
          return inspect.FullArgSpec(
              ['_'], '__unknown__varargs', '__unknown__keywords', (),
              [], {}, {})
        else:  # Python 2
          return inspect.ArgSpec(
              ['_'], '__unknown__varargs', '__unknown__keywords', ())

def get_signature(func):
  """Like inspect.signature(), but supports Py2 as well.

  This module uses inspect.signature instead of getfullargspec since in the
  latter: 'the "self" parameter is always reported, even for bound methods'
  # Fall back on funcsigs if inspect module doesn't have 'signature'; prefer
  # inspect.signature over funcsigs.signature if both are available.
  if hasattr(inspect, 'signature'):
    inspect_ = inspect
    inspect_ = funcsigs

    signature = inspect_.signature(func)
  except ValueError:
    # Fall back on a catch-all signature.
    params = [
        inspect_.Parameter('_', inspect_.Parameter.POSITIONAL_OR_KEYWORD),

    signature = inspect_.Signature(params)

  # This is a specialization to hint the first argument of certain builtins,
  # such as str.strip.
  if isinstance(func, _MethodDescriptorType):
    params = list(signature.parameters.values())
    if params[0].annotation == params[0].empty:
      params[0] = params[0].replace(annotation=func.__objclass__)
      signature = signature.replace(parameters=params)

  # This is a specialization to hint the return value of type callables.
  if (signature.return_annotation == signature.empty and
      isinstance(func, type)):
    signature = signature.replace(return_annotation=typehints.normalize(func))

  return signature

class IOTypeHints(object):
  """Encapsulates all type hint information about a Dataflow construct.

  This should primarily be used via the WithTypeHints mixin class, though
  may also be attached to other objects (such as Python functions).

    input_types: (tuple, dict) List of typing types, and an optional dictionary.
      May be None. The list and dict correspond to args and kwargs.
    output_types: (tuple, dict) List of typing types, and an optional dictionary
      (unused). Only the first element of the list is used. May be None.
  __slots__ = ('input_types', 'output_types')

  def __init__(self, input_types=None, output_types=None):
    self.input_types = input_types
    self.output_types = output_types

  def from_callable(fn):
    """Construct an IOTypeHints object from a callable's signature.

    Supports Python 3 annotations. For partial annotations, sets unknown types

      A new IOTypeHints or None if no annotations found.
    if not _enable_from_callable:
      return None
    signature = get_signature(fn)
    if (all(param.annotation == param.empty
            for param in signature.parameters.values())
        and signature.return_annotation == signature.empty):
      return None
    input_args = []
    input_kwargs = {}
    for param in signature.parameters.values():
      if param.annotation == param.empty:
        if param.kind == param.VAR_POSITIONAL:
        elif param.kind == param.VAR_KEYWORD:
          input_kwargs[] = _ANY_VAR_KEYWORD
        elif param.kind == param.KEYWORD_ONLY:
          input_kwargs[] = typehints.Any
        if param.kind in [param.KEYWORD_ONLY, param.VAR_KEYWORD]:
          input_kwargs[] = param.annotation
          assert param.kind in [param.POSITIONAL_ONLY,
                                param.VAR_POSITIONAL], \
              'Unsupported Parameter kind: %s' % param.kind
    output_args = []
    if signature.return_annotation != signature.empty:

    return IOTypeHints(input_types=(tuple(input_args), input_kwargs),
                       output_types=(tuple(output_args), {}))

  def set_input_types(self, *args, **kwargs):
    self.input_types = args, kwargs

  def set_output_types(self, *args, **kwargs):
    self.output_types = args, kwargs

  def simple_output_type(self, context):
    if self.output_types:
      args, kwargs = self.output_types
      if len(args) != 1 or kwargs:
        raise TypeError(
            'Expected single output type hint for %s but got: %s' % (
                context, self.output_types))
      return args[0]

  def has_simple_output_type(self):
    """Whether there's a single positional output type."""
    return (self.output_types and len(self.output_types[0]) == 1 and
            not self.output_types[1])

  def strip_iterable(self):
    """Removes outer Iterable (or equivalent) from output type.

    Only affects instances with simple output types, otherwise is a no-op.

    Example: Generator[Tuple(int, int)] becomes Tuple(int, int)

      ValueError if output type is simple and not iterable.
    if not self.has_simple_output_type():
    yielded_type = typehints.get_yielded_type(self.output_types[0][0])
    self.output_types = ((yielded_type,), {})

  def copy(self):
    return IOTypeHints(self.input_types, self.output_types)

  def with_defaults(self, hints):
    if not hints:
      return self
    if self._has_input_types():
      input_types = self.input_types
      input_types = hints.input_types
    if self._has_output_types():
      output_types = self.output_types
      output_types = hints.output_types
    return IOTypeHints(input_types, output_types)

  def _has_input_types(self):
    return self.input_types is not None and any(self.input_types)

  def _has_output_types(self):
    return self.output_types is not None and any(self.output_types)

  def __bool__(self):
    return self._has_input_types() or self._has_output_types()

  def __repr__(self):
    return 'IOTypeHints[inputs=%s, outputs=%s]' % (
        self.input_types, self.output_types)

[docs]class WithTypeHints(object): """A mixin class that provides the ability to set and retrieve type hints. """ def __init__(self, *unused_args, **unused_kwargs): self._type_hints = IOTypeHints() def _get_or_create_type_hints(self): # __init__ may have not been called try: return self._type_hints except AttributeError: self._type_hints = IOTypeHints() return self._type_hints
[docs] def get_type_hints(self): """Gets and/or initializes type hints for this object. If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints. """ return (self._get_or_create_type_hints() .with_defaults(self.default_type_hints()) .with_defaults(get_type_hints(self.__class__)))
[docs] def default_type_hints(self): return None
[docs] def with_input_types(self, *arg_hints, **kwarg_hints): arg_hints = native_type_compatibility.convert_to_beam_types(arg_hints) kwarg_hints = native_type_compatibility.convert_to_beam_types(kwarg_hints) self._get_or_create_type_hints().set_input_types(*arg_hints, **kwarg_hints) return self
[docs] def with_output_types(self, *arg_hints, **kwarg_hints): arg_hints = native_type_compatibility.convert_to_beam_types(arg_hints) kwarg_hints = native_type_compatibility.convert_to_beam_types(kwarg_hints) self._get_or_create_type_hints().set_output_types(*arg_hints, **kwarg_hints) return self
[docs]class TypeCheckError(Exception): pass
def _positional_arg_hints(arg, hints): """Returns the type of a (possibly tuple-packed) positional argument. E.g. for lambda ((a, b), c): None the single positional argument is (as returned by inspect) [[a, b], c] which should have type Tuple[Tuple[Int, Any], float] when applied to the type hints {a: int, b: Any, c: float}. """ if isinstance(arg, list): return typehints.Tuple[[_positional_arg_hints(a, hints) for a in arg]] return hints.get(arg, typehints.Any) def _unpack_positional_arg_hints(arg, hint): """Unpacks the given hint according to the nested structure of arg. For example, if arg is [[a, b], c] and hint is Tuple[Any, int], then this function would return ((Any, Any), int) so it can be used in conjunction with inspect.getcallargs. """ if isinstance(arg, list): tuple_constraint = typehints.Tuple[[typehints.Any] * len(arg)] if not typehints.is_consistent_with(hint, tuple_constraint): raise TypeCheckError('Bad tuple arguments for %s: expected %s, got %s' % (arg, tuple_constraint, hint)) if isinstance(hint, typehints.TupleConstraint): return tuple(_unpack_positional_arg_hints(a, t) for a, t in zip(arg, hint.tuple_types)) return (typehints.Any,) * len(arg) return hint def getcallargs_forhints(func, *typeargs, **typekwargs): """Like inspect.getcallargs, with support for declaring default args as Any. In Python 2, understands that Tuple[] and an Any unpack. Returns: (Dict[str, Any]) A dictionary from arguments names to values. """ if sys.version_info < (3,): return getcallargs_forhints_impl_py2(func, typeargs, typekwargs) else: return getcallargs_forhints_impl_py3(func, typeargs, typekwargs) def getcallargs_forhints_impl_py2(func, typeargs, typekwargs): argspec = getfullargspec(func) # Turn Tuple[x, y] into (x, y) so getcallargs can do the proper unpacking. packed_typeargs = [_unpack_positional_arg_hints(arg, hint) for (arg, hint) in zip(argspec.args, typeargs)] packed_typeargs += list(typeargs[len(packed_typeargs):]) # Monkeypatch inspect.getfullargspec to allow passing non-function objects. # getfullargspec (getargspec on Python 2) are used by inspect.getcallargs. # TODO(BEAM-5490): Reimplement getcallargs and stop relying on monkeypatch. inspect.getargspec = getfullargspec try: callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs) except TypeError as e: raise TypeCheckError(e) finally: # Revert monkey-patch. inspect.getargspec = _original_getfullargspec if argspec.defaults: # Declare any default arguments to be Any. for k, var in enumerate(reversed(argspec.args)): if k >= len(argspec.defaults): break if callargs.get(var, None) is argspec.defaults[-k-1]: callargs[var] = typehints.Any # Patch up varargs and keywords if argspec.varargs: # TODO(BEAM-8122): This will always assign _ANY_VAR_POSITIONAL. Should be # "callargs.get(...) or _ANY_VAR_POSITIONAL". callargs[argspec.varargs] = typekwargs.get( argspec.varargs, _ANY_VAR_POSITIONAL) varkw = argspec.keywords if varkw: # TODO(robertwb): Consider taking the union of key and value types. callargs[varkw] = typekwargs.get(varkw, _ANY_VAR_KEYWORD) # TODO(BEAM-5878) Support kwonlyargs. return callargs def _normalize_var_positional_hint(hint): """Converts a var_positional hint into Tuple[Union[<types>], ...] form. Args: hint: (tuple) Should be either a tuple of one or more types, or a single Tuple[<type>, ...]. Raises: TypeCheckError if hint does not have the right form. """ if not hint or type(hint) != tuple: raise TypeCheckError('Unexpected VAR_POSITIONAL value: %s' % hint) if len(hint) == 1 and isinstance(hint[0], typehints.TupleSequenceConstraint): # Example: tuple(Tuple[Any, ...]) -> Tuple[Any, ...] return hint[0] else: # Example: tuple(int, str) -> Tuple[Union[int, str], ...] return typehints.Tuple[typehints.Union[hint], ...] def _normalize_var_keyword_hint(hint, arg_name): """Converts a var_keyword hint into Dict[<key type>, <value type>] form. Args: hint: (dict) Should either contain a pair (arg_name, Dict[<key type>, <value type>]), or one or more possible types for the value. arg_name: (str) The keyword receiving this hint. Raises: TypeCheckError if hint does not have the right form. """ if not hint or type(hint) != dict: raise TypeCheckError('Unexpected VAR_KEYWORD value: %s' % hint) keys = list(hint.keys()) values = list(hint.values()) if (len(values) == 1 and keys[0] == arg_name and isinstance(values[0], typehints.DictConstraint)): # Example: dict(kwargs=Dict[str, Any]) -> Dict[str, Any] return values[0] else: # Example: dict(k1=str, k2=int) -> Dict[str, Union[str,int]] return typehints.Dict[str, typehints.Union[values]] def getcallargs_forhints_impl_py3(func, type_args, type_kwargs): """Bind type_args and type_kwargs to func. Works like inspect.getcallargs, with some modifications to support type hint checks. For unbound args, will use annotations and fall back to Any (or variants of Any). Returns: A mapping from parameter name to argument. """ try: signature = get_signature(func) except ValueError as e: logging.warning('Could not get signature for function: %s: %s', func, e) return {} try: bindings = signature.bind(*type_args, **type_kwargs) except TypeError as e: # Might be raised due to too few or too many arguments. raise TypeCheckError(e) bound_args = bindings.arguments for param in signature.parameters.values(): if in bound_args: # Bound: unpack/convert variadic arguments. if param.kind == param.VAR_POSITIONAL: bound_args[] = _normalize_var_positional_hint( bound_args[]) elif param.kind == param.VAR_KEYWORD: bound_args[] = _normalize_var_keyword_hint( bound_args[], else: # Unbound: must have a default or be variadic. if param.annotation != param.empty: bound_args[] = param.annotation elif param.kind == param.VAR_POSITIONAL: bound_args[] = _ANY_VAR_POSITIONAL elif param.kind == param.VAR_KEYWORD: bound_args[] = _ANY_VAR_KEYWORD elif param.default is not param.empty: # Declare unbound parameters with defaults to be Any. bound_args[] = typehints.Any else: # This case should be caught by signature.bind() above. raise ValueError('Unexpected unbound parameter: %s' % return dict(bound_args) def get_type_hints(fn): """Gets the type hint associated with an arbitrary object fn. Always returns a valid IOTypeHints object, creating one if necessary. """ # pylint: disable=protected-access if not hasattr(fn, '_type_hints'): try: fn._type_hints = IOTypeHints() except (AttributeError, TypeError): # Can't add arbitrary attributes to this object, # but might have some restrictions anyways... hints = IOTypeHints() # Python 3.7 introduces annotations for _MethodDescriptorTypes. if isinstance(fn, _MethodDescriptorType) and sys.version_info < (3, 7): hints.set_input_types(fn.__objclass__) return hints return fn._type_hints # pylint: enable=protected-access
[docs]def with_input_types(*positional_hints, **keyword_hints): """A decorator that type-checks defined type-hints with passed func arguments. All type-hinted arguments can be specified using positional arguments, keyword arguments, or a mix of both. Additionaly, all function arguments must be type-hinted in totality if even one parameter is type-hinted. Once fully decorated, if the arguments passed to the resulting function violate the type-hint constraints defined, a :class:`TypeCheckError` detailing the error will be raised. To be used as: .. testcode:: from apache_beam.typehints import with_input_types @with_input_types(str) def upper(s): return s.upper() Or: .. testcode:: from apache_beam.typehints import with_input_types from apache_beam.typehints import List from apache_beam.typehints import Tuple @with_input_types(ls=List[Tuple[int, int]]) def increment(ls): [(i + 1, j + 1) for (i,j) in ls] Args: *positional_hints: Positional type-hints having identical order as the function's formal arguments. Values for this argument must either be a built-in Python type or an instance of a :class:`~apache_beam.typehints.typehints.TypeConstraint` created by 'indexing' a :class:`~apache_beam.typehints.typehints.CompositeTypeHint` instance with a type parameter. **keyword_hints: Keyword arguments mirroring the names of the parameters to the decorated functions. The value of each keyword argument must either be one of the allowed built-in Python types, a custom class, or an instance of a :class:`~apache_beam.typehints.typehints.TypeConstraint` created by 'indexing' a :class:`~apache_beam.typehints.typehints.CompositeTypeHint` instance with a type parameter. Raises: :class:`~exceptions.ValueError`: If not all function arguments have corresponding type-hints specified. Or if the inner wrapper function isn't passed a function object. :class:`TypeCheckError`: If the any of the passed type-hint constraints are not a type or :class:`~apache_beam.typehints.typehints.TypeConstraint` instance. Returns: The original function decorated such that it enforces type-hint constraints for all received function arguments. """ converted_positional_hints = ( native_type_compatibility.convert_to_beam_types(positional_hints)) converted_keyword_hints = ( native_type_compatibility.convert_to_beam_types(keyword_hints)) del positional_hints del keyword_hints def annotate(f): if isinstance(f, types.FunctionType): for t in (list(converted_positional_hints) + list(converted_keyword_hints.values())): validate_composite_type_param( t, error_msg_prefix='All type hint arguments') get_type_hints(f).set_input_types(*converted_positional_hints, **converted_keyword_hints) return f return annotate
[docs]def with_output_types(*return_type_hint, **kwargs): """A decorator that type-checks defined type-hints for return values(s). This decorator will type-check the return value(s) of the decorated function. Only a single type-hint is accepted to specify the return type of the return value. If the function to be decorated has multiple return values, then one should use: ``Tuple[type_1, type_2]`` to annotate the types of the return values. If the ultimate return value for the function violates the specified type-hint a :class:`TypeCheckError` will be raised detailing the type-constraint violation. This decorator is intended to be used like: .. testcode:: from apache_beam.typehints import with_output_types from apache_beam.typehints import Set class Coordinate(object): def __init__(self, x, y): self.x = x self.y = y @with_output_types(Set[Coordinate]) def parse_ints(ints): return {Coordinate(i, i) for i in ints} Or with a simple type-hint: .. testcode:: from apache_beam.typehints import with_output_types @with_output_types(bool) def negate(p): return not p if p else p Args: *return_type_hint: A type-hint specifying the proper return type of the function. This argument should either be a built-in Python type or an instance of a :class:`~apache_beam.typehints.typehints.TypeConstraint` created by 'indexing' a :class:`~apache_beam.typehints.typehints.CompositeTypeHint`. **kwargs: Not used. Raises: :class:`~exceptions.ValueError`: If any kwarg parameters are passed in, or the length of **return_type_hint** is greater than ``1``. Or if the inner wrapper function isn't passed a function object. :class:`TypeCheckError`: If the **return_type_hint** object is in invalid type-hint. Returns: The original function decorated such that it enforces type-hint constraints for all return values. """ if kwargs: raise ValueError("All arguments for the 'returns' decorator must be " "positional arguments.") if len(return_type_hint) != 1: raise ValueError("'returns' accepts only a single positional argument. In " "order to specify multiple return types, use the 'Tuple' " "type-hint.") return_type_hint = native_type_compatibility.convert_to_beam_type( return_type_hint[0]) validate_composite_type_param( return_type_hint, error_msg_prefix='All type hint arguments' ) def annotate(f): get_type_hints(f).set_output_types(return_type_hint) return f return annotate
def _check_instance_type( type_constraint, instance, var_name=None, verbose=False): """A helper function to report type-hint constraint violations. Args: type_constraint: An instance of a 'TypeConstraint' or a built-in Python type. instance: The candidate object which will be checked by to satisfy 'type_constraint'. var_name: If 'instance' is an argument, then the actual name for the parameter in the original function definition. Raises: TypeCheckError: If 'instance' fails to meet the type-constraint of 'type_constraint'. """ hint_type = ( "argument: '%s'" % var_name if var_name is not None else 'return type') try: check_constraint(type_constraint, instance) except SimpleTypeHintError: if verbose: verbose_instance = '%s, ' % instance else: verbose_instance = '' raise TypeCheckError('Type-hint for %s violated. Expected an ' 'instance of %s, instead found %san instance of %s.' % (hint_type, type_constraint, verbose_instance, type(instance))) except CompositeTypeHintError as e: raise TypeCheckError('Type-hint for %s violated: %s' % (hint_type, e)) def _interleave_type_check(type_constraint, var_name=None): """Lazily type-check the type-hint for a lazily generated sequence type. This function can be applied as a decorator or called manually in a curried manner: * @_interleave_type_check(List[int]) def gen(): yield 5 or * gen = _interleave_type_check(Tuple[int, int], 'coord_gen')(gen) As a result, all type-checking for the passed generator will occur at 'yield' time. This way, we avoid having to depleat the generator in order to type-check it. Args: type_constraint: An instance of a TypeConstraint. The output yielded of 'gen' will be type-checked according to this type constraint. var_name: The variable name binded to 'gen' if type-checking a function argument. Used solely for templating in error message generation. Returns: A function which takes a generator as an argument and returns a wrapped version of the generator that interleaves type-checking at 'yield' iteration. If the generator received is already wrapped, then it is simply returned to avoid nested wrapping. """ def wrapper(gen): if isinstance(gen, GeneratorWrapper): return gen return GeneratorWrapper( gen, lambda x: _check_instance_type(type_constraint, x, var_name) ) return wrapper class GeneratorWrapper(object): """A wrapper around a generator, allows execution of a callback per yield. Additionally, wrapping a generator with this class allows one to assign arbitary attributes to a generator object just as with a function object. Attributes: internal_gen: A instance of a generator object. As part of 'step' of the generator, the yielded object will be passed to 'interleave_func'. interleave_func: A callback accepting a single argument. This function will be called with the result of each yielded 'step' in the internal generator. """ def __init__(self, gen, interleave_func): self.internal_gen = gen self.interleave_func = interleave_func def __getattr__(self, attr): # TODO(laolu): May also want to intercept 'send' in the future if we move to # a GeneratorHint with 3 type-params: # * Generator[send_type, return_type, yield_type] if attr == '__next__': return self.__next__() elif attr == '__iter__': return self.__iter__() return getattr(self.internal_gen, attr) def __next__(self): next_val = next(self.internal_gen) self.interleave_func(next_val) return next_val next = __next__ def __iter__(self): for x in self.internal_gen: self.interleave_func(x) yield x