#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Module to convert Python's native typing types to Beam types."""
# pytype: skip-file
import collections
import logging
import sys
import types
import typing
from apache_beam.typehints import typehints
_LOGGER = logging.getLogger(__name__)
# Describes an entry in the type map in convert_to_beam_type.
# match is a function that takes a user type and returns whether the conversion
# should trigger.
# arity is the expected arity of the user type. -1 means it's variadic.
# beam_type is the Beam type the user type should map to.
_TypeMapEntry = collections.namedtuple(
'_TypeMapEntry', ['match', 'arity', 'beam_type'])
_BUILTINS_TO_TYPING = {
dict: typing.Dict,
list: typing.List,
tuple: typing.Tuple,
set: typing.Set,
frozenset: typing.FrozenSet,
}
_CONVERTED_COLLECTIONS = [
collections.abc.Set,
collections.abc.MutableSet,
collections.abc.Collection,
]
def _get_args(typ):
"""Returns a list of arguments to the given type.
Args:
typ: A typing module typing type.
Returns:
A tuple of args.
"""
try:
if typ.__args__ is None:
return ()
return typ.__args__
except AttributeError:
if isinstance(typ, typing.TypeVar):
return (typ.__name__, )
return ()
def _safe_issubclass(derived, parent):
"""Like issubclass, but swallows TypeErrors.
This is useful for when either parameter might not actually be a class,
e.g. typing.Union isn't actually a class.
Args:
derived: As in issubclass.
parent: As in issubclass.
Returns:
issubclass(derived, parent), or False if a TypeError was raised.
"""
try:
return issubclass(derived, parent)
except (TypeError, AttributeError):
if hasattr(derived, '__origin__'):
try:
return issubclass(derived.__origin__, parent)
except TypeError:
pass
return False
def _match_issubclass(match_against):
return lambda user_type: _safe_issubclass(user_type, match_against)
def _match_is_exactly_mapping(user_type):
# Avoid unintentionally catching all subtypes (e.g. strings and mappings).
if sys.version_info < (3, 7):
expected_origin = typing.Mapping
else:
expected_origin = collections.abc.Mapping
return getattr(user_type, '__origin__', None) is expected_origin
def _match_is_exactly_iterable(user_type):
if user_type is typing.Iterable:
return True
# Avoid unintentionally catching all subtypes (e.g. strings and mappings).
if sys.version_info < (3, 7):
expected_origin = typing.Iterable
else:
expected_origin = collections.abc.Iterable
return getattr(user_type, '__origin__', None) is expected_origin
def _match_is_exactly_collection(user_type):
return getattr(user_type, '__origin__', None) is collections.abc.Collection
[docs]def match_is_named_tuple(user_type):
return (
_safe_issubclass(user_type, typing.Tuple) and
hasattr(user_type, '__annotations__'))
def _match_is_optional(user_type):
return _match_is_union(user_type) and sum(
tp is type(None) for tp in _get_args(user_type)) == 1
def _match_is_union(user_type):
# For non-subscripted unions (Python 2.7.14+ with typing 3.64)
if user_type is typing.Union:
return True
try: # Python 3.5.4+, or Python 2.7.14+ with typing 3.64
return user_type.__origin__ is typing.Union
except AttributeError:
pass
return False
[docs]def match_is_set(user_type):
if _safe_issubclass(user_type, typing.Set):
return True
elif getattr(user_type, '__origin__', None) is not None:
return _safe_issubclass(user_type.__origin__, collections.abc.Set)
else:
return False
[docs]def is_any(typ):
return typ is typing.Any
[docs]def is_new_type(typ):
return hasattr(typ, '__supertype__')
_ForwardRef = typing.ForwardRef # Python 3.7+
[docs]def is_forward_ref(typ):
return isinstance(typ, _ForwardRef)
# Mapping from typing.TypeVar/typehints.TypeVariable ids to an object of the
# other type. Bidirectional mapping preserves typing.TypeVar instances.
_type_var_cache = {} # type: typing.Dict[int, typehints.TypeVariable]
[docs]def convert_builtin_to_typing(typ):
"""Convert recursively a given builtin to a typing object.
Args:
typ (`builtins`): builtin object that exist in _BUILTINS_TO_TYPING.
Returns:
type: The given builtins converted to a type.
"""
if getattr(typ, '__origin__', None) in _BUILTINS_TO_TYPING:
args = map(convert_builtin_to_typing, typ.__args__)
typ = _BUILTINS_TO_TYPING[typ.__origin__].copy_with(tuple(args))
return typ
[docs]def convert_collections_to_typing(typ):
"""Converts a given collections.abc type to a typing object.
Args:
typ: an object inheriting from a collections.abc object
Returns:
type: The corresponding typing object.
"""
if hasattr(typ, '__iter__'):
if hasattr(typ, '__next__'):
typ = typing.Iterator[typ.__args__]
elif hasattr(typ, 'send') and hasattr(typ, 'throw'):
typ = typing.Generator[typ.__args__]
elif _match_is_exactly_iterable(typ):
typ = typing.Iterable[typ.__args__]
return typ
[docs]def convert_to_beam_type(typ):
"""Convert a given typing type to a Beam type.
Args:
typ (`typing.Union[type, str]`): typing type or string literal representing
a type.
Returns:
type: The given type converted to a Beam type as far as we can do the
conversion.
Raises:
ValueError: The type was malformed.
"""
# Convert `int | float` to typing.Union[int, float]
# pipe operator as Union and types.UnionType are introduced
# in Python 3.10.
# GH issue: https://github.com/apache/beam/issues/21972
if (sys.version_info.major == 3 and
sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)):
typ = typing.Union[typ]
if sys.version_info >= (3, 9) and isinstance(typ, types.GenericAlias):
typ = convert_builtin_to_typing(typ)
if sys.version_info >= (3, 9) and getattr(typ, '__module__',
None) == 'collections.abc':
typ = convert_collections_to_typing(typ)
typ_module = getattr(typ, '__module__', None)
if isinstance(typ, typing.TypeVar):
# This is a special case, as it's not parameterized by types.
# Also, identity must be preserved through conversion (i.e. the same
# TypeVar instance must get converted into the same TypeVariable instance).
# A global cache should be OK as the number of distinct type variables
# is generally small.
if id(typ) not in _type_var_cache:
new_type_variable = typehints.TypeVariable(typ.__name__)
_type_var_cache[id(typ)] = new_type_variable
_type_var_cache[id(new_type_variable)] = typ
return _type_var_cache[id(typ)]
elif isinstance(typ, str):
# Special case for forward references.
# TODO(https://github.com/apache/beam/issues/19954): Currently unhandled.
_LOGGER.info('Converting string literal type hint to Any: "%s"', typ)
return typehints.Any
elif sys.version_info >= (3, 10) and isinstance(typ, typing.NewType): # pylint: disable=isinstance-second-argument-not-valid-type
# Special case for NewType, where, since Python 3.10, NewType is now a class
# rather than a function.
# TODO(https://github.com/apache/beam/issues/20076): Currently unhandled.
_LOGGER.info('Converting NewType type hint to Any: "%s"', typ)
return typehints.Any
elif (typ_module != 'typing') and (typ_module != 'collections.abc'):
# Only translate types from the typing and collections.abc modules.
return typ
if (typ_module == 'collections.abc' and
typ.__origin__ not in _CONVERTED_COLLECTIONS):
# TODO(https://github.com/apache/beam/issues/29135):
# Support more collections types
return typ
type_map = [
# TODO(https://github.com/apache/beam/issues/20076): Currently
# unsupported.
_TypeMapEntry(match=is_new_type, arity=0, beam_type=typehints.Any),
# TODO(https://github.com/apache/beam/issues/19954): Currently
# unsupported.
_TypeMapEntry(match=is_forward_ref, arity=0, beam_type=typehints.Any),
_TypeMapEntry(match=is_any, arity=0, beam_type=typehints.Any),
_TypeMapEntry(
match=_match_issubclass(typing.Dict),
arity=2,
beam_type=typehints.Dict),
_TypeMapEntry(
match=_match_is_exactly_iterable,
arity=1,
beam_type=typehints.Iterable),
_TypeMapEntry(
match=_match_issubclass(typing.List),
arity=1,
beam_type=typehints.List),
# FrozenSets are a specific instance of a set, so we check this first.
_TypeMapEntry(
match=_match_issubclass(typing.FrozenSet),
arity=1,
beam_type=typehints.FrozenSet),
_TypeMapEntry(match=match_is_set, arity=1, beam_type=typehints.Set),
# NamedTuple is a subclass of Tuple, but it needs special handling.
# We just convert it to Any for now.
# This MUST appear before the entry for the normal Tuple.
_TypeMapEntry(
match=match_is_named_tuple, arity=0, beam_type=typehints.Any),
_TypeMapEntry(
match=_match_issubclass(typing.Tuple),
arity=-1,
beam_type=typehints.Tuple),
_TypeMapEntry(match=_match_is_union, arity=-1, beam_type=typehints.Union),
_TypeMapEntry(
match=_match_issubclass(typing.Generator),
arity=3,
beam_type=typehints.Generator),
_TypeMapEntry(
match=_match_issubclass(typing.Iterator),
arity=1,
beam_type=typehints.Iterator),
_TypeMapEntry(
match=_match_is_exactly_collection,
arity=1,
beam_type=typehints.Collection),
]
# Find the first matching entry.
matched_entry = next((entry for entry in type_map if entry.match(typ)), None)
if not matched_entry:
# Please add missing type support if you see this message.
_LOGGER.info('Using Any for unsupported type: %s', typ)
return typehints.Any
args = _get_args(typ)
len_args = len(args)
if len_args == 0 and len_args != matched_entry.arity:
arity = matched_entry.arity
# Handle unsubscripted types.
if _match_issubclass(typing.Tuple)(typ):
args = (typehints.TypeVariable('T'), Ellipsis)
elif _match_is_union(typ):
raise ValueError('Unsupported Union with no arguments.')
elif _match_issubclass(typing.Generator)(typ):
# Assume a simple generator.
args = (typehints.TypeVariable('T_co'), type(None), type(None))
elif _match_issubclass(typing.Dict)(typ):
args = (typehints.TypeVariable('KT'), typehints.TypeVariable('VT'))
elif (_match_issubclass(typing.Iterator)(typ) or
_match_is_exactly_iterable(typ)):
args = (typehints.TypeVariable('T_co'), )
else:
args = (typehints.TypeVariable('T'), ) * arity
elif matched_entry.arity == -1:
arity = len_args
else:
arity = matched_entry.arity
if len_args != arity:
raise ValueError(
'expecting type %s to have arity %d, had arity %d '
'instead' % (str(typ), arity, len_args))
typs = convert_to_beam_types(args)
if arity == 0:
# Nullary types (e.g. Any) don't accept empty tuples as arguments.
return matched_entry.beam_type
elif arity == 1:
# Unary types (e.g. Set) don't accept 1-tuples as arguments
return matched_entry.beam_type[typs[0]]
else:
return matched_entry.beam_type[tuple(typs)]
[docs]def convert_to_beam_types(args):
"""Convert the given list or dictionary of args to Beam types.
Args:
args: Either an iterable of types, or a dictionary where the values are
types.
Returns:
If given an iterable, a list of converted types. If given a dictionary,
a dictionary with the same keys, and values which have been converted.
"""
if isinstance(args, dict):
return {k: convert_to_beam_type(v) for k, v in args.items()}
else:
return [convert_to_beam_type(v) for v in args]
[docs]def convert_to_typing_type(typ):
"""Converts a given Beam type to a typing type.
This is the reverse of convert_to_beam_type.
Args:
typ: If a typehints.TypeConstraint, the type to convert. Otherwise, typ
will be unchanged.
Returns:
Converted version of typ, or unchanged.
Raises:
ValueError: The type was malformed or could not be converted.
"""
if isinstance(typ, typehints.TypeVariable):
# This is a special case, as it's not parameterized by types.
# Also, identity must be preserved through conversion (i.e. the same
# TypeVariable instance must get converted into the same TypeVar instance).
# A global cache should be OK as the number of distinct type variables
# is generally small.
if id(typ) not in _type_var_cache:
new_type_variable = typing.TypeVar(typ.name)
_type_var_cache[id(typ)] = new_type_variable
_type_var_cache[id(new_type_variable)] = typ
return _type_var_cache[id(typ)]
elif not getattr(typ, '__module__', None).endswith('typehints'):
# Only translate types from the typehints module.
return typ
if isinstance(typ, typehints.AnyTypeConstraint):
return typing.Any
if isinstance(typ, typehints.DictConstraint):
return typing.Dict[convert_to_typing_type(typ.key_type),
convert_to_typing_type(typ.value_type)]
if isinstance(typ, typehints.ListConstraint):
return typing.List[convert_to_typing_type(typ.inner_type)]
if isinstance(typ, typehints.IterableTypeConstraint):
return typing.Iterable[convert_to_typing_type(typ.inner_type)]
if isinstance(typ, typehints.UnionConstraint):
if not typ.union_types:
# Gracefully handle the empty union type.
return typing.Any
return typing.Union[tuple(convert_to_typing_types(typ.union_types))]
if isinstance(typ, typehints.SetTypeConstraint):
return typing.Set[convert_to_typing_type(typ.inner_type)]
if isinstance(typ, typehints.FrozenSetTypeConstraint):
return typing.FrozenSet[convert_to_typing_type(typ.inner_type)]
if isinstance(typ, typehints.TupleConstraint):
return typing.Tuple[tuple(convert_to_typing_types(typ.tuple_types))]
if isinstance(typ, typehints.TupleSequenceConstraint):
return typing.Tuple[convert_to_typing_type(typ.inner_type), ...]
if isinstance(typ, typehints.IteratorTypeConstraint):
return typing.Iterator[convert_to_typing_type(typ.yielded_type)]
raise ValueError('Failed to convert Beam type: %s' % typ)
[docs]def convert_to_typing_types(args):
"""Convert the given list or dictionary of args to typing types.
Args:
args: Either an iterable of types, or a dictionary where the values are
types.
Returns:
If given an iterable, a list of converted types. If given a dictionary,
a dictionary with the same keys, and values which have been converted.
"""
if isinstance(args, dict):
return {k: convert_to_typing_type(v) for k, v in args.items()}
else:
return [convert_to_typing_type(v) for v in args]