Source code for apache_beam.typehints.row_type

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

from __future__ import annotations

from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple

from apache_beam.typehints import typehints
from apache_beam.typehints.native_type_compatibility import match_is_named_tuple

# Name of the attribute added to user types (existing and generated) to store
# the corresponding schema ID
_BEAM_SCHEMA_ID = "_beam_schema_id"


[docs]class RowTypeConstraint(typehints.TypeConstraint): def __init__( self, fields: List[Tuple[str, type]], user_type=None, schema_options: Optional[List[Tuple[str, Any]]] = None, field_options: Optional[Dict[str, List[Tuple[str, Any]]]] = None): """For internal use only, no backwards comatibility guaratees. See https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types for guidance on creating PCollections with inferred schemas. Note RowTypeConstraint does not currently store arbitrary functions for converting to/from the user type. Instead, we only support ``NamedTuple`` user types and make the follow assumptions: - The user type can be constructed with field values as arguments in order (i.e. ``constructor(*field_values)``). - Field values can be accessed from instances of the user type by attribute (i.e. with ``getattr(obj, field_name)``). In the future we will add support for dataclasses ([#22085](https://github.com/apache/beam/issues/22085)) which also satisfy these assumptions. The RowTypeConstraint constructor should not be called directly (even internally to Beam). Prefer static methods ``from_user_type`` or ``from_fields``. Parameters: fields: a list of (name, type) tuples, representing the schema inferred from user_type. user_type: constructor for a user type (e.g. NamedTuple class) that is used to represent this schema in user code. schema_options: A list of (key, value) tuples representing schema-level options. field_options: A dictionary representing field-level options. Dictionary keys are field names, and dictionary values are lists of (key, value) tuples representing field-level options for that field. """ # Recursively wrap row types in a RowTypeConstraint self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ) for name, typ in fields) self._user_type = user_type # Note schema ID can be None if the schema is not registered yet. # Currently registration happens when converting to schema protos, in # apache_beam.typehints.schemas if self._user_type is not None: self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID, None) else: self._schema_id = None self._schema_options = schema_options or [] self._field_options = field_options or {}
[docs] @staticmethod def from_user_type( user_type: type, schema_options: Optional[List[Tuple[str, Any]]] = None, field_options: Optional[Dict[str, List[Tuple[str, Any]]]] = None ) -> Optional[RowTypeConstraint]: if match_is_named_tuple(user_type): fields = [(name, user_type.__annotations__[name]) for name in user_type._fields] # TODO(https://github.com/apache/beam/issues/22125): Add user API for # specifying schema/field options return RowTypeConstraint( fields=fields, user_type=user_type, schema_options=schema_options, field_options=field_options) return None
[docs] @staticmethod def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint: return RowTypeConstraint(fields=fields, user_type=None)
@property def user_type(self): return self._user_type
[docs] def set_schema_id(self, schema_id): self._schema_id = schema_id if self._user_type is not None: setattr(self._user_type, _BEAM_SCHEMA_ID, self._schema_id)
@property def schema_id(self): return self._schema_id @property def schema_options(self): return self._schema_options
[docs] def field_options(self, field_name): # Raise if field_name is not one of the fields? return self._field_options.get(field_name, [])
def _consistent_with_check_(self, sub): return self == sub
[docs] def type_check(self, instance): from apache_beam import Row return isinstance(instance, (Row, self._user_type))
def _inner_types(self): """Iterates over the inner types of the composite type.""" return [field[1] for field in self._fields] def __eq__(self, other): return type(self) == type(other) and self._fields == other._fields def __hash__(self): return hash(self._fields) def __repr__(self): return 'Row(%s)' % ', '.join( '%s=%s' % (name, typehints._unified_repr(t)) for name, t in self._fields)
[docs] def get_type_for(self, name): return dict(self._fields)[name]