Source code for apache_beam.typehints.typehints

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Syntax & semantics for type-hinting custom-functions/PTransforms in the SDK.

This module defines type-hinting objects and the corresponding syntax for
type-hinting function arguments, function return types, or PTransform object
themselves. TypeHint's defined in the module can be used to implement either
static or run-time type-checking in regular Python code.

Type-hints are defined by 'indexing' a type-parameter into a defined
CompositeTypeHint instance:

  * 'List[int]'.

Valid type-hints are partitioned into two categories: simple, and composite.

Simple type hints are type hints based on a subset of Python primitive types:
int, bool, float, str, object, None, and bytes. No other primitive types are
allowed.

Composite type-hints are reserved for hinting the types of container-like
Python objects such as 'list'. Composite type-hints can be parameterized by an
inner simple or composite type-hint, using the 'indexing' syntax. In order to
avoid conflicting with the namespace of the built-in container types, when
specifying this category of type-hints, the first letter should be capitalized.
The following composite type-hints are permitted. NOTE: 'T' can be any of the
type-hints listed or a simple Python type:

  * Any
  * Union[T, T, T]
  * Optional[T]
  * Tuple[T, T]
  * Tuple[T, ...]
  * List[T]
  * KV[T, T]
  * Dict[T, T]
  * Set[T]
  * Iterable[T]
  * Iterator[T]
  * Generator[T]

Type-hints can be nested, allowing one to define type-hints for complex types:

  * 'List[Tuple[int, int, str]]

In addition, type-hints can be used to implement run-time type-checking via the
'type_check' method on each TypeConstraint.

"""

# pytype: skip-file

import copy
import logging
import sys
import types
import typing
from collections import abc

__all__ = [
    'Any',
    'Union',
    'Optional',
    'Tuple',
    'List',
    'KV',
    'Dict',
    'Set',
    'FrozenSet',
    'Collection',
    'Iterable',
    'Iterator',
    'Generator',
    'WindowedValue',
    'TypeVariable',
]

# A set of the built-in Python types we don't support, guiding the users
# to templated (upper-case) versions instead.
DISALLOWED_PRIMITIVE_TYPES = (list, set, frozenset, tuple, dict)

_LOGGER = logging.getLogger(__name__)


class SimpleTypeHintError(TypeError):
  pass


class CompositeTypeHintError(TypeError):
  pass


class GetitemConstructor(type):
  """A metaclass that makes Cls[arg] an alias for Cls(arg)."""
  def __getitem__(cls, arg):
    return cls(arg)


class TypeConstraint(object):
  """The base-class for all created type-constraints defined below.

  A :class:`TypeConstraint` is the result of parameterizing a
  :class:`CompositeTypeHint` with with one of the allowed Python types or
  another :class:`CompositeTypeHint`. It binds and enforces a specific
  version of a generalized TypeHint.
  """
  def _consistent_with_check_(self, sub):
    """Returns whether sub is consistent with self.

    Has the same relationship to is_consistent_with() as
    __subclasscheck__ does for issubclass().

    Not meant to be called directly; call is_consistent_with(sub, self)
    instead.

    Implementation may assume that maybe_sub_type is not Any
    and has been normalized.
    """
    raise NotImplementedError

  def type_check(self, instance):
    """Determines if the type of 'instance' satisfies this type constraint.

    Args:
      instance: An instance of a Python object.

    Raises:
      :class:`TypeError`: The passed **instance** doesn't satisfy
        this :class:`TypeConstraint`. Subclasses of
        :class:`TypeConstraint` are free to raise any of the subclasses of
        :class:`TypeError` defined above, depending on
        the manner of the type hint error.

    All :class:`TypeConstraint` sub-classes must define this method in other
    for the class object to be created.
    """
    raise NotImplementedError

  def match_type_variables(self, unused_concrete_type):
    return {}

  def bind_type_variables(self, unused_bindings):
    return self

  def _inner_types(self):
    """Iterates over the inner types of the composite type."""
    return []

  def visit(self, visitor, visitor_arg):
    """Visitor method to visit all inner types of a composite type.

    Args:
      visitor: A callable invoked for all nodes in the type tree comprising
        a composite type. The visitor will be called with the node visited
        and the visitor argument specified here.
      visitor_arg: Visitor callback second argument.
    """
    visitor(self, visitor_arg)
    for t in self._inner_types():
      if isinstance(t, TypeConstraint):
        t.visit(visitor, visitor_arg)
      else:
        visitor(t, visitor_arg)


def visit_inner_types(type_constraint, visitor, visitor_arg):
  """Visitor pattern to visit all inner types of a type constraint.

  Args:
    type_constraint: A type constraint or a type.
    visitor: A callable invoked for all nodes in the type tree comprising a
      composite type. The visitor will be called with the node visited and the
      visitor argument specified here.
    visitor_arg: Visitor callback second argument.

  Note:
    Raise and capture a StopIteration to terminate the visit, e.g.

    ```
    def visitor(type_constraint, visitor_arg):
      if ...:
        raise StopIteration

    try:
      visit_inner_types(type_constraint, visitor, visitor_arg)
    except StopIteration:
      pass
    ```
  """
  if isinstance(type_constraint, TypeConstraint):
    return type_constraint.visit(visitor, visitor_arg)
  return visitor(type_constraint, visitor_arg)


def match_type_variables(type_constraint, concrete_type):
  if isinstance(type_constraint, TypeConstraint):
    return type_constraint.match_type_variables(concrete_type)
  return {}


def bind_type_variables(type_constraint, bindings):
  if isinstance(type_constraint, TypeConstraint):
    return type_constraint.bind_type_variables(bindings)
  return type_constraint


class IndexableTypeConstraint(TypeConstraint):
  """An internal common base-class for all type constraints with indexing.
  E.G. SequenceTypeConstraint + Tuple's of fixed size.
  """
  def _constraint_for_index(self, idx):
    """Returns the type at the given index. This is used to allow type inference
    to determine the correct type for a specific index. On lists this will also
    be the same, however for tuples the value will depend on the position. This
    was added as part of the futurize changes since more of the expressions now
    index into tuples."""
    raise NotImplementedError


class SequenceTypeConstraint(IndexableTypeConstraint):
  """A common base-class for all sequence related type-constraint classes.

  A sequence is defined as an arbitrary length homogeneous container type. Type
  hints which fall under this category include: List[T], Set[T], Iterable[T],
  and Tuple[T, ...].

  Sub-classes may need to override '_consistent_with_check_' if a particular
  sequence requires special handling with respect to type compatibility.

  Attributes:
    inner_type: The type which every element in the sequence should be an
      instance of.
  """
  def __init__(self, inner_type, sequence_type):
    self.inner_type = normalize(inner_type)
    self._sequence_type = sequence_type

  def __eq__(self, other):
    return (
        isinstance(other, SequenceTypeConstraint) and
        type(self) == type(other) and self.inner_type == other.inner_type)

  def __hash__(self):
    return hash(self.inner_type) ^ 13 * hash(type(self))

  def _inner_types(self):
    yield self.inner_type

  def _constraint_for_index(self, idx):
    """Returns the type at the given index."""
    return self.inner_type

  def _consistent_with_check_(self, sub):
    return (
        isinstance(sub, self.__class__) and
        is_consistent_with(sub.inner_type, self.inner_type))

  def type_check(self, sequence_instance):
    if not isinstance(sequence_instance, self._sequence_type):
      raise CompositeTypeHintError(
          "%s type-constraint violated. Valid object instance "
          "must be of type '%s'. Instead, an instance of '%s' "
          "was received." % (
              self._sequence_type.__name__.title(),
              self._sequence_type.__name__.lower(),
              sequence_instance.__class__.__name__))

    for index, elem in enumerate(sequence_instance):
      try:
        check_constraint(self.inner_type, elem)
      except SimpleTypeHintError:
        raise CompositeTypeHintError(
            '%s hint type-constraint violated. The type of element #%s in '
            'the passed %s is incorrect. Expected an instance of type %s, '
            'instead received an instance of type %s.' % (
                repr(self),
                index,
                repr(self._sequence_type),
                repr(self.inner_type),
                elem.__class__.__name__))
      except CompositeTypeHintError as e:
        raise CompositeTypeHintError(
            '%s hint type-constraint violated. The type of element #%s in '
            'the passed %s is incorrect: %s' %
            (repr(self), index, self._sequence_type.__name__, e))

  def match_type_variables(self, concrete_type):
    if isinstance(concrete_type, SequenceTypeConstraint):
      return match_type_variables(self.inner_type, concrete_type.inner_type)
    return {}

  def bind_type_variables(self, bindings):
    bound_inner_type = bind_type_variables(self.inner_type, bindings)
    if bound_inner_type == self.inner_type:
      return self
    bound_self = copy.copy(self)
    bound_self.inner_type = bound_inner_type
    return bound_self


class CompositeTypeHint(object):
  """The base-class for all created type-hint classes defined below.

  CompositeTypeHint's serve primarily as TypeConstraint factories. They are
  only required to define a single method: '__getitem__' which should return a
  parameterized TypeConstraint, that can be used to enforce static or run-time
  type-checking.

  '__getitem__' is used as a factory function in order to provide a familiar
  API for defining type-hints. The ultimate result is that one will be able to
  use: CompositeTypeHint[type_parameter] to create a type-hint object that
  behaves like any other Python object. This allows one to create
  'type-aliases' by assigning the returned type-hints to a variable.

    * Example: 'Coordinates = List[Tuple[int, int]]'
  """
  def __getitem___(self, py_type):
    """Given a type creates a TypeConstraint instance parameterized by the type.

    This function serves as a factory function which creates TypeConstraint
    instances. Additionally, implementations by sub-classes should perform any
    sanity checking of the passed types in this method in order to rule-out
    disallowed behavior. Such as, attempting to create a TypeConstraint whose
    parameterized type is actually an object instance.

    Args:
      py_type: An instance of a Python type or TypeConstraint.

    Returns: An instance of a custom TypeConstraint for this CompositeTypeHint.

    Raises:
      TypeError: If the passed type violates any contraints for this particular
        TypeHint.
    """
    raise NotImplementedError


def is_typing_generic(type_param):
  """Determines if an object is a subscripted typing.Generic type, such as
  PCollection[int].

  Such objects are considered valid type parameters.

  For Python versions 3.9 and above, also permits types.GenericAlias.
  """
  if hasattr(types, "GenericAlias") and isinstance(type_param,
                                                   types.GenericAlias):
    return True
  return isinstance(type_param, typing._GenericAlias)


def validate_composite_type_param(type_param, error_msg_prefix):
  """Determines if an object is a valid type parameter to a
  :class:`CompositeTypeHint`.

  Implements sanity checking to disallow things like::

    List[1, 2, 3] or Dict[5].

  Args:
    type_param: An object instance.
    error_msg_prefix (:class:`str`): A string prefix used to format an error
      message in the case of an exception.

  Raises:
    TypeError: If the passed **type_param** is not a valid type
      parameter for a :class:`CompositeTypeHint`.
  """
  # Must either be a TypeConstraint instance or a basic Python type.
  possible_classes = [type, TypeConstraint]
  is_not_type_constraint = (
      not is_typing_generic(type_param) and
      not isinstance(type_param, tuple(possible_classes)) and
      type_param is not None and
      getattr(type_param, '__module__', None) != 'typing')
  if sys.version_info.major == 3 and sys.version_info.minor >= 10:
    if isinstance(type_param, types.UnionType):
      is_not_type_constraint = False
  # Pre-Python 3.9 compositve type-hinting with built-in types was not
  # supported, the typing module equivalents should be used instead.
  if sys.version_info.major == 3 and sys.version_info.minor < 9:
    is_not_type_constraint = is_not_type_constraint or (
        isinstance(type_param, type) and
        type_param in DISALLOWED_PRIMITIVE_TYPES)

  if is_not_type_constraint:
    raise TypeError(
        '%s must be a non-sequence, a type, or a TypeConstraint. %s'
        ' is an instance of %s.' %
        (error_msg_prefix, type_param, type_param.__class__.__name__))


def check_constraint(type_constraint, object_instance):
  """Determine if the passed type instance satisfies the TypeConstraint.

  When examining a candidate type for constraint satisfaction in
  'type_check', all CompositeTypeHint's eventually call this function. This
  function may end up being called recursively if the hinted type of a
  CompositeTypeHint is another CompositeTypeHint.

  Args:
    type_constraint: An instance of a TypeConstraint or a built-in Python type.
    object_instance: An object instance.

  Raises:
    SimpleTypeHintError: If 'type_constraint' is a one of the allowed primitive
      Python types and 'object_instance' isn't an instance of this type.
    CompositeTypeHintError: If 'type_constraint' is a TypeConstraint object and
      'object_instance' does not satisfy its constraint.
  """
  if type_constraint is None and object_instance is None:
    return
  elif isinstance(type_constraint, TypeConstraint):
    type_constraint.type_check(object_instance)
  elif type_constraint is None:
    # TODO(robertwb): Fix uses of None for Any.
    pass
  elif not isinstance(type_constraint, type):
    raise RuntimeError("bad type: %s" % (type_constraint, ))
  elif not isinstance(object_instance, type_constraint):
    raise SimpleTypeHintError


class AnyTypeConstraint(TypeConstraint):
  """An Any type-hint.

  Any is intended to be used as a "don't care" when hinting the types of
  function arguments or return types. All other TypeConstraint's are equivalent
  to 'Any', and its 'type_check' method is a no-op.
  """
  def __eq__(self, other):
    return type(self) == type(other)

  def __repr__(self):
    return 'Any'

  def __hash__(self):
    # TODO(https://github.com/apache/beam/issues/18633): Fix
    # typehints.TypeVariable issues with __hash__.
    return hash(id(self))

  def type_check(self, instance):
    pass


[docs]class TypeVariable(AnyTypeConstraint): def __init__(self, name, use_name_in_eq=True): self.name = name self.use_name_in_eq = use_name_in_eq def __eq__(self, other): # The "other" may be an Ellipsis object # so we have to check if it has use_name_in_eq first if self.use_name_in_eq and (hasattr(other, 'use_name_in_eq') and other.use_name_in_eq): return type(self) == type(other) and self.name == other.name return type(self) == type(other) def __hash__(self): # TODO(https://github.com/apache/beam/issues/18633): Fix # typehints.TypeVariable issues with __hash__. return hash(id(self)) def __repr__(self): return 'TypeVariable[%s]' % self.name
[docs] def match_type_variables(self, concrete_type): return {self: concrete_type}
[docs] def bind_type_variables(self, bindings): return bindings.get( self, # Star matches all type variables. bindings.get('*', self))
class UnionHint(CompositeTypeHint): """A Union type-hint. Union[X, Y] accepts instances of type X OR type Y. Duplicate type parameters are ignored. Additonally, Nested Union hints will be flattened out. For example: * Union[Union[str, int], bool] -> Union[str, int, bool] A candidate type instance satisfies a UnionConstraint if it is an instance of any of the parameterized 'union_types' for a Union. Union[X] is disallowed, and all type parameters will be sanity checked to ensure compatibility with nested type-hints. When comparing two Union hints, ordering is enforced before comparison. * Union[int, str] == Union[str, int] """ class UnionConstraint(TypeConstraint): def __init__(self, union_types): self.union_types = set(normalize(t) for t in union_types) def __eq__(self, other): return ( isinstance(other, UnionHint.UnionConstraint) and self.union_types == other.union_types) def __hash__(self): return 1 + sum(hash(t) for t in self.union_types) def __repr__(self): # Sorting the type name strings simplifies unit tests. return 'Union[%s]' % ( ', '.join(sorted(repr(t) for t in self.union_types))) def inner_types(self): for t in self.union_types: yield t def contains_type(self, maybe_type): return maybe_type in self.union_types def _consistent_with_check_(self, sub): if isinstance(sub, UnionConstraint): # A union type is compatible if every possible type is compatible. # E.g. Union[A, B, C] > Union[A, B]. return all(is_consistent_with(elem, self) for elem in sub.union_types) # Other must be compatible with at least one of this union's subtypes. # E.g. Union[A, B, C] > T if T > A or T > B or T > C. return any(is_consistent_with(sub, elem) for elem in self.union_types) def type_check(self, instance): error_msg = '' for t in self.union_types: try: check_constraint(t, instance) return except TypeError as e: error_msg = str(e) continue raise CompositeTypeHintError( '%s type-constraint violated. Expected an instance of one of: %s, ' 'received %s instead.%s' % ( repr(self), tuple(sorted(repr(t) for t in self.union_types)), instance.__class__.__name__, error_msg)) def match_type_variables(self, concrete_type): sub_bindings = [ match_type_variables(t, concrete_type) for t in self.union_types if is_consistent_with(concrete_type, t) ] if sub_bindings: return { var: Union[(sub[var] for sub in sub_bindings)] for var in set.intersection( *[set(sub.keys()) for sub in sub_bindings]) } else: return {} def bind_type_variables(self, bindings): return Union[(bind_type_variables(t, bindings) for t in self.union_types)] def __getitem__(self, type_params): if not isinstance(type_params, (abc.Iterable, set)): raise TypeError('Cannot create Union without a sequence of types.') # Flatten nested Union's and duplicated repeated type hints. params = set() dict_union = None for t in type_params: validate_composite_type_param( t, error_msg_prefix='All parameters to a Union hint') if isinstance(t, self.UnionConstraint): params |= t.union_types elif isinstance(t, DictConstraint): if dict_union is None: dict_union = t else: dict_union.key_type = Union[dict_union.key_type, t.key_type] dict_union.value_type = Union[dict_union.value_type, t.value_type] else: params.add(t) if dict_union is not None: params.add(dict_union) if Any in params: return Any elif len(params) == 1: return next(iter(params)) if len(params) > 1: from apache_beam.typehints import schemas try: return schemas.union_schema_type(params) except (TypeError, KeyError): # Not a union of compatible schema types. pass return self.UnionConstraint(params) UnionConstraint = UnionHint.UnionConstraint class OptionalHint(UnionHint): """An Option type-hint. Optional[X] accepts instances of X or None. The Optional[X] factory function proxies to Union[X, type(None)] """ def __getitem__(self, py_type): # A single type must have been passed. if isinstance(py_type, abc.Sequence): raise TypeError( 'An Option type-hint only accepts a single type ' 'parameter.') return Union[py_type, type(None)] def is_nullable(typehint): return ( isinstance(typehint, UnionConstraint) and typehint.contains_type(type(None)) and len(list(typehint.inner_types())) == 2) def get_concrete_type_from_nullable(typehint): if is_nullable(typehint): for inner_type in typehint.inner_types(): if not type(None) == inner_type: return inner_type else: raise TypeError('Typehint is not of nullable type', typehint) class TupleHint(CompositeTypeHint): """A Tuple type-hint. Tuple can accept 1 or more type-hint parameters. Tuple[X, Y] represents a tuple of *exactly* two elements, with the first being of type 'X' and the second an instance of type 'Y'. * (1, 2) satisfies Tuple[int, int] Additionally, one is able to type-hint an arbitary length, homogeneous tuple by passing the Ellipsis (...) object as the second parameter. As an example, Tuple[str, ...] indicates a tuple of any length with each element being an instance of 'str'. """ class TupleSequenceConstraint(SequenceTypeConstraint): def __init__(self, type_param): super().__init__(type_param, tuple) def __repr__(self): return 'Tuple[%s, ...]' % repr(self.inner_type) def _consistent_with_check_(self, sub): if isinstance(sub, TupleConstraint): # E.g. Tuple[A, B] < Tuple[C, ...] iff A < C and B < C. return all( is_consistent_with(elem, self.inner_type) for elem in sub.tuple_types) return super()._consistent_with_check_(sub) class TupleConstraint(IndexableTypeConstraint): def __init__(self, type_params): self.tuple_types = tuple(normalize(t) for t in type_params) def __eq__(self, other): return ( isinstance(other, TupleHint.TupleConstraint) and self.tuple_types == other.tuple_types) def __hash__(self): return hash(self.tuple_types) def __repr__(self): return 'Tuple[%s]' % (', '.join(repr(t) for t in self.tuple_types)) def _inner_types(self): for t in self.tuple_types: yield t def _constraint_for_index(self, idx): """Returns the type at the given index.""" return self.tuple_types[idx] def _consistent_with_check_(self, sub): return ( isinstance(sub, self.__class__) and len(sub.tuple_types) == len(self.tuple_types) and all( is_consistent_with(sub_elem, elem) for sub_elem, elem in zip(sub.tuple_types, self.tuple_types))) def type_check(self, tuple_instance): if not isinstance(tuple_instance, tuple): raise CompositeTypeHintError( "Tuple type constraint violated. Valid object instance must be of " "type 'tuple'. Instead, an instance of '%s' was received." % tuple_instance.__class__.__name__) if len(tuple_instance) != len(self.tuple_types): raise CompositeTypeHintError( 'Passed object instance is of the proper type, but differs in ' 'length from the hinted type. Expected a tuple of length %s, ' 'received a tuple of length %s.' % (len(self.tuple_types), len(tuple_instance))) for type_pos, (expected, actual) in enumerate(zip(self.tuple_types, tuple_instance)): try: check_constraint(expected, actual) continue except SimpleTypeHintError: raise CompositeTypeHintError( '%s hint type-constraint violated. The type of element #%s in ' 'the passed tuple is incorrect. Expected an instance of ' 'type %s, instead received an instance of type %s.' % (repr(self), type_pos, repr(expected), actual.__class__.__name__)) except CompositeTypeHintError as e: raise CompositeTypeHintError( '%s hint type-constraint violated. The type of element #%s in ' 'the passed tuple is incorrect. %s' % (repr(self), type_pos, e)) def match_type_variables(self, concrete_type): bindings = {} if isinstance(concrete_type, TupleConstraint): for a, b in zip(self.tuple_types, concrete_type.tuple_types): bindings.update(match_type_variables(a, b)) return bindings def bind_type_variables(self, bindings): bound_tuple_types = tuple( bind_type_variables(t, bindings) for t in self.tuple_types) if bound_tuple_types == self.tuple_types: return self return Tuple[bound_tuple_types] def __getitem__(self, type_params): ellipsis = False if not isinstance(type_params, abc.Iterable): # Special case for hinting tuples with arity-1. type_params = (type_params, ) if type_params and type_params[-1] == Ellipsis: if len(type_params) != 2: raise TypeError( 'Ellipsis can only be used to type-hint an arbitrary ' 'length tuple of containing a single type: ' 'Tuple[A, ...].') # Tuple[A, ...] indicates an arbitary length homogeneous tuple. type_params = type_params[:1] ellipsis = True for t in type_params: validate_composite_type_param( t, error_msg_prefix='All parameters to a Tuple hint') if ellipsis: return self.TupleSequenceConstraint(type_params[0]) return self.TupleConstraint(type_params) TupleConstraint = TupleHint.TupleConstraint TupleSequenceConstraint = TupleHint.TupleSequenceConstraint class ListHint(CompositeTypeHint): """A List type-hint. List[X] represents an instance of a list populated by a single homogeneous type. The parameterized type 'X' can either be a built-in Python type or an instance of another TypeConstraint. * ['1', '2', '3'] satisfies List[str] """ class ListConstraint(SequenceTypeConstraint): def __init__(self, list_type): super().__init__(list_type, list) def __repr__(self): return 'List[%s]' % repr(self.inner_type) def __getitem__(self, t): validate_composite_type_param(t, error_msg_prefix='Parameter to List hint') return self.ListConstraint(t) ListConstraint = ListHint.ListConstraint class KVHint(CompositeTypeHint): """A KV type-hint, represents a Key-Value pair of a particular type. Internally, KV[X, Y] proxies to Tuple[X, Y]. A KV type-hint accepts only accepts exactly two type-parameters. The first represents the required key-type and the second the required value-type. """ def __getitem__(self, type_params): if not isinstance(type_params, tuple): raise TypeError( 'Parameter to KV type-hint must be a tuple of types: ' 'KV[.., ..].') if len(type_params) != 2: raise TypeError( 'Length of parameters to a KV type-hint must be exactly 2. Passed ' 'parameters: %s, have a length of %s.' % (type_params, len(type_params))) return Tuple[type_params] def key_value_types(kv): """Returns the key and value type of a KV type-hint. Args: kv: An instance of a TypeConstraint sub-class. Returns: A tuple: (key_type, value_type) if the passed type-hint is an instance of a KV type-hint, and (Any, Any) otherwise. """ if isinstance(kv, TupleHint.TupleConstraint): return kv.tuple_types return Any, Any class DictHint(CompositeTypeHint): """A Dict type-hint. Dict[K, V] Represents a dictionary where all keys are of a particular type and all values are of another (possible the same) type. """ class DictConstraint(TypeConstraint): def __init__(self, key_type, value_type): self.key_type = normalize(key_type) self.value_type = normalize(value_type) def __repr__(self): return 'Dict[%s, %s]' % (repr(self.key_type), repr(self.value_type)) def __eq__(self, other): return ( type(self) == type(other) and self.key_type == other.key_type and self.value_type == other.value_type) def __hash__(self): return hash((type(self), self.key_type, self.value_type)) def _inner_types(self): yield self.key_type yield self.value_type def _consistent_with_check_(self, sub): return ( isinstance(sub, self.__class__) and is_consistent_with(sub.key_type, self.key_type) and is_consistent_with(sub.value_type, self.value_type)) def _raise_hint_exception_or_inner_exception( self, is_key, incorrect_instance, inner_error_message=''): incorrect_type = 'values' if not is_key else 'keys' hinted_type = self.value_type if not is_key else self.key_type if inner_error_message: raise CompositeTypeHintError( '%s hint %s-type constraint violated. All %s should be of type ' '%s. Instead: %s' % ( repr(self), incorrect_type[:-1], incorrect_type, repr(hinted_type), inner_error_message)) else: raise CompositeTypeHintError( '%s hint %s-type constraint violated. All %s should be of ' 'type %s. Instead, %s is of type %s.' % ( repr(self), incorrect_type[:-1], incorrect_type, repr(hinted_type), incorrect_instance, incorrect_instance.__class__.__name__)) def type_check(self, dict_instance): if not isinstance(dict_instance, dict): raise CompositeTypeHintError( 'Dict type-constraint violated. All passed instances must be of ' 'type dict. %s is of type %s.' % (dict_instance, dict_instance.__class__.__name__)) for key, value in dict_instance.items(): try: check_constraint(self.key_type, key) except CompositeTypeHintError as e: self._raise_hint_exception_or_inner_exception(True, key, str(e)) except SimpleTypeHintError: self._raise_hint_exception_or_inner_exception(True, key) try: check_constraint(self.value_type, value) except CompositeTypeHintError as e: self._raise_hint_exception_or_inner_exception(False, value, str(e)) except SimpleTypeHintError: self._raise_hint_exception_or_inner_exception(False, value) def match_type_variables(self, concrete_type): if isinstance(concrete_type, DictConstraint): bindings = {} bindings.update( match_type_variables(self.key_type, concrete_type.key_type)) bindings.update( match_type_variables(self.value_type, concrete_type.value_type)) return bindings return {} def bind_type_variables(self, bindings): bound_key_type = bind_type_variables(self.key_type, bindings) bound_value_type = bind_type_variables(self.value_type, bindings) if (bound_key_type, self.key_type) == (bound_value_type, self.value_type): return self return Dict[bound_key_type, bound_value_type] def __getitem__(self, type_params): # Type param must be a (k, v) pair. if not isinstance(type_params, tuple): raise TypeError( 'Parameter to Dict type-hint must be a tuple of types: ' 'Dict[.., ..].') if len(type_params) != 2: raise TypeError( 'Length of parameters to a Dict type-hint must be exactly 2. Passed ' 'parameters: %s, have a length of %s.' % (type_params, len(type_params))) key_type, value_type = type_params validate_composite_type_param( key_type, error_msg_prefix='Key-type parameter to a Dict hint') validate_composite_type_param( value_type, error_msg_prefix='Value-type parameter to a Dict hint') return self.DictConstraint(key_type, value_type) DictConstraint = DictHint.DictConstraint class SetHint(CompositeTypeHint): """A Set type-hint. Set[X] defines a type-hint for a set of homogeneous types. 'X' may be either a built-in Python type or a another nested TypeConstraint. """ class SetTypeConstraint(SequenceTypeConstraint): def __init__(self, type_param): super().__init__(type_param, set) def __repr__(self): return 'Set[%s]' % repr(self.inner_type) def __getitem__(self, type_param): validate_composite_type_param( type_param, error_msg_prefix='Parameter to a Set hint') return self.SetTypeConstraint(type_param) SetTypeConstraint = SetHint.SetTypeConstraint class FrozenSetHint(CompositeTypeHint): """A FrozenSet type-hint. FrozenSet[X] defines a type-hint for a set of homogeneous types. 'X' may be either a built-in Python type or a another nested TypeConstraint. This is a mirror copy of SetHint - consider refactoring common functionality. """ class FrozenSetTypeConstraint(SequenceTypeConstraint): def __init__(self, type_param): super(FrozenSetHint.FrozenSetTypeConstraint, self).__init__(type_param, frozenset) def __repr__(self): return 'FrozenSet[%s]' % repr(self.inner_type) def __getitem__(self, type_param): validate_composite_type_param( type_param, error_msg_prefix='Parameter to a FrozenSet hint') return self.FrozenSetTypeConstraint(type_param) FrozenSetTypeConstraint = FrozenSetHint.FrozenSetTypeConstraint class CollectionHint(CompositeTypeHint): """ A Collection type-hint. Collection[X] defines a type-hint for a collection of homogenous types. 'X' may be either a built-in Python type or another nested TypeConstraint. This represents a collections.abc.Collection type, which implements __contains__, __iter__, and __len__. This acts as a parent type for sets but has fewer guarantees for mixins. """ class CollectionTypeConstraint(SequenceTypeConstraint): def __init__(self, type_param): super().__init__(type_param, abc.Collection) def __repr__(self): return 'Collection[%s]' % repr(self.inner_type) @staticmethod def _is_subclass_constraint(sub): return isinstance( sub, ( CollectionTypeConstraint, FrozenSetTypeConstraint, SetTypeConstraint, ListConstraint)) def _consistent_with_check_(self, sub): if self._is_subclass_constraint(sub): return is_consistent_with(sub.inner_type, self.inner_type) elif isinstance(sub, TupleConstraint): if not sub.tuple_types: # The empty tuple is consistent with Iterator[T] for any T. return True # Each element in the hetrogenious tuple must be consistent with # the collection type. # E.g. Tuple[A, B] < Collection[C] if A < C and B < C. return all( is_consistent_with(elem, self.inner_type) for elem in sub.tuple_types) # TODO(https://github.com/apache/beam/issues/29135): allow for # consistency checks with Mapping types elif isinstance(sub, DictConstraint): return True elif not isinstance(sub, TypeConstraint): if getattr(sub, '__origin__', None) is not None and getattr( sub, '__args__', None) is not None: return issubclass(sub, abc.Collection) and is_consistent_with( sub.__args__, self.inner_type) return False def __getitem__(self, type_param): validate_composite_type_param( type_param, error_msg_prefix='Parameter to a Collection hint') return self.CollectionTypeConstraint(type_param) CollectionTypeConstraint = CollectionHint.CollectionTypeConstraint class IterableHint(CompositeTypeHint): """An Iterable type-hint. Iterable[X] defines a type-hint for an object implementing an '__iter__' method which yields objects which are all of the same type. """ class IterableTypeConstraint(SequenceTypeConstraint): def __init__(self, iter_type): super(IterableHint.IterableTypeConstraint, self).__init__(iter_type, abc.Iterable) def __repr__(self): return 'Iterable[%s]' % repr(self.inner_type) def _consistent_with_check_(self, sub): if isinstance(sub, SequenceTypeConstraint): return is_consistent_with(sub.inner_type, self.inner_type) elif isinstance(sub, TupleConstraint): if not sub.tuple_types: # The empty tuple is consistent with Iterator[T] for any T. return True # Each element in the hetrogenious tuple must be consistent with # the iterator type. # E.g. Tuple[A, B] < Iterable[C] if A < C and B < C. return all( is_consistent_with(elem, self.inner_type) for elem in sub.tuple_types) return False def __getitem__(self, type_param): validate_composite_type_param( type_param, error_msg_prefix='Parameter to an Iterable hint') return self.IterableTypeConstraint(type_param) IterableTypeConstraint = IterableHint.IterableTypeConstraint class IteratorHint(CompositeTypeHint): """An Iterator type-hint. Iterator[X] defines a type-hint for an object implementing both '__iter__' and a 'next' method which yields objects which are all of the same type. Type checking a type-hint of this type is deferred in order to avoid depleting the underlying lazily generated sequence. See decorators.interleave_type_check for further information. """ class IteratorTypeConstraint(TypeConstraint): def __init__(self, t): self.yielded_type = normalize(t) def __repr__(self): return 'Iterator[%s]' % repr(self.yielded_type) def __eq__(self, other): return ( type(self) == type(other) and self.yielded_type == other.yielded_type) def __hash__(self): return hash(self.yielded_type) def _inner_types(self): yield self.yielded_type def _consistent_with_check_(self, sub): return ( isinstance(sub, self.__class__) and is_consistent_with(sub.yielded_type, self.yielded_type)) def type_check(self, instance): # Special case for lazy types, we only need to enforce the underlying # type. This avoid having to compute the entirety of the generator/iter. try: check_constraint(self.yielded_type, instance) return except CompositeTypeHintError as e: raise CompositeTypeHintError( '%s hint type-constraint violated: %s' % (repr(self), str(e))) except SimpleTypeHintError: raise CompositeTypeHintError( '%s hint type-constraint violated. Expected a iterator of type %s. ' 'Instead received a iterator of type %s.' % (repr(self), repr(self.yielded_type), instance.__class__.__name__)) def __getitem__(self, type_param): validate_composite_type_param( type_param, error_msg_prefix='Parameter to an Iterator hint') return self.IteratorTypeConstraint(type_param) IteratorTypeConstraint = IteratorHint.IteratorTypeConstraint class WindowedTypeConstraint(TypeConstraint, metaclass=GetitemConstructor): """A type constraint for WindowedValue objects. Mostly for internal use. Attributes: inner_type: The type which the element should be an instance of. """ def __init__(self, inner_type): self.inner_type = normalize(inner_type) def __eq__(self, other): return ( isinstance(other, WindowedTypeConstraint) and self.inner_type == other.inner_type) def __hash__(self): return hash(self.inner_type) ^ 13 * hash(type(self)) def _inner_types(self): yield self.inner_type def _consistent_with_check_(self, sub): return ( isinstance(sub, self.__class__) and is_consistent_with(sub.inner_type, self.inner_type)) def type_check(self, instance): from apache_beam.transforms import window if not isinstance(instance, window.WindowedValue): raise CompositeTypeHintError( "Window type-constraint violated. Valid object instance " "must be of type 'WindowedValue'. Instead, an instance of '%s' " "was received." % (instance.__class__.__name__)) try: check_constraint(self.inner_type, instance.value) except (CompositeTypeHintError, SimpleTypeHintError): raise CompositeTypeHintError( '%s hint type-constraint violated. The type of element in ' 'is incorrect. Expected an instance of type %s, ' 'instead received an instance of type %s.' % ( repr(self), repr(self.inner_type), instance.value.__class__.__name__)) class GeneratorHint(IteratorHint): """A Generator type hint. Subscriptor is in the form [yield_type, send_type, return_type], however only yield_type is supported. The 2 others are expected to be None. """ def __getitem__(self, type_params): if isinstance(type_params, tuple) and len(type_params) == 3: yield_type, send_type, return_type = type_params if send_type is not type(None): _LOGGER.warning('Ignoring send_type hint: %s' % send_type) if return_type is not type(None): _LOGGER.warning('Ignoring return_type hint: %s' % return_type) else: yield_type = type_params return self.IteratorTypeConstraint(yield_type) # Create the actual instances for all defined type-hints above. Any = AnyTypeConstraint() Union = UnionHint() Optional = OptionalHint() Tuple = TupleHint() List = ListHint() KV = KVHint() Dict = DictHint() Set = SetHint() FrozenSet = FrozenSetHint() Collection = CollectionHint() Iterable = IterableHint() Iterator = IteratorHint() Generator = GeneratorHint() WindowedValue = WindowedTypeConstraint # There is a circular dependency between defining this mapping # and using it in normalize(). Initialize it here and populate # it below. _KNOWN_PRIMITIVE_TYPES: typing.Dict[type, typing.Any] = {} def normalize(x, none_as_type=False): # None is inconsistantly used for Any, unknown, or NoneType. # Avoid circular imports from apache_beam.typehints import native_type_compatibility if sys.version_info >= (3, 9) and isinstance(x, types.GenericAlias): x = native_type_compatibility.convert_builtin_to_typing(x) if none_as_type and x is None: return type(None) elif x in _KNOWN_PRIMITIVE_TYPES: return _KNOWN_PRIMITIVE_TYPES[x] elif getattr(x, '__module__', None) == 'typing': beam_type = native_type_compatibility.convert_to_beam_type(x) if beam_type != x: # We were able to do the conversion. return beam_type else: # It might be a compatible type we don't understand. return Any return x _KNOWN_PRIMITIVE_TYPES.update({ dict: Dict[Any, Any], list: List[Any], tuple: Tuple[Any, ...], set: Set[Any], frozenset: FrozenSet[Any], }) def is_consistent_with(sub, base): """Checks whether sub a is consistent with base. This is according to the terminology of PEP 483/484. This relationship is neither symmetric nor transitive, but a good mnemonic to keep in mind is that is_consistent_with(a, b) is roughly equivalent to the issubclass(a, b) relation, but also handles the special Any type as well as type parameterization. """ from apache_beam.pvalue import Row from apache_beam.typehints.row_type import RowTypeConstraint if sub == base: # Common special case. return True if isinstance(sub, AnyTypeConstraint) or isinstance(base, AnyTypeConstraint): return True sub = normalize(sub, none_as_type=True) base = normalize(base, none_as_type=True) if isinstance(sub, UnionConstraint): return all(is_consistent_with(c, base) for c in sub.union_types) elif isinstance(base, TypeConstraint): return base._consistent_with_check_(sub) elif isinstance(sub, RowTypeConstraint): return base == Row elif isinstance(sub, TypeConstraint): # Nothing but object lives above any type constraints. return base == object return issubclass(sub, base) def get_yielded_type(type_hint): """Obtains the type of elements yielded by an iterable. Note that "iterable" here means: can be iterated over in a for loop, excluding strings and dicts. Args: type_hint: (TypeConstraint) The iterable in question. Must be normalize()-d. Returns: Yielded type of the iterable. Raises: ValueError if not iterable. """ if isinstance(type_hint, AnyTypeConstraint): return type_hint if is_consistent_with(type_hint, Iterator[Any]): return type_hint.yielded_type if is_consistent_with(type_hint, Tuple[Any, ...]): if isinstance(type_hint, TupleConstraint): return Union[type_hint.tuple_types] else: # TupleSequenceConstraint return type_hint.inner_type if is_consistent_with(type_hint, Iterable[Any]): return type_hint.inner_type raise ValueError('%s is not iterable' % type_hint) def coerce_to_kv_type(element_type, label=None, side_input_producer=None): """Attempts to coerce element_type to a compatible kv type. Raises an error on failure. """ if side_input_producer: consumer = 'side-input of %r (producer: %r)' % (label, side_input_producer) else: consumer = '%r' % label # If element_type is not specified, then treat it as `Any`. if not element_type: return KV[Any, Any] elif isinstance(element_type, TupleHint.TupleConstraint): if len(element_type.tuple_types) == 2: return element_type else: raise ValueError( "Tuple input to %s must have two components. " "Found %s." % (consumer, element_type)) elif isinstance(element_type, AnyTypeConstraint): # `Any` type needs to be replaced with a KV[Any, Any] to # satisfy the KV form. return KV[Any, Any] elif isinstance(element_type, UnionConstraint): union_types = [coerce_to_kv_type(t) for t in element_type.union_types] return KV[Union[tuple(t.tuple_types[0] for t in union_types)], Union[tuple(t.tuple_types[1] for t in union_types)]] else: # TODO: Possibly handle other valid types. raise ValueError( "Input to %s must be compatible with KV[Any, Any]. " "Found %s." % (consumer, element_type))