Source code for apache_beam.ml.anomaly.specifiable

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A module that provides utilities to turn a class into a Specifiable subclass.
"""

from __future__ import annotations

import collections
import dataclasses
import inspect
import logging
import os
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Protocol
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from typing import runtime_checkable

from typing_extensions import Self

__all__ = ["Spec", "Specifiable", "specifiable"]

_FALLBACK_SUBSPACE = "*"

_ACCEPTED_SUBSPACES = [
    "EnsembleAnomalyDetector",
    "AnomalyDetector",
    "BaseTracker",
    "ThresholdFn",
    "AggregationFn",
    _FALLBACK_SUBSPACE,
]

#: A nested dictionary for efficient lookup of Specifiable subclasses.
#: Structure: `_KNOWN_SPECIFIABLE[subspace][spec_type]`, where `subspace` is one
#: of the accepted subspaces that the class belongs to and `spec_type` is the
#: class name by default. Users can also specify a different value for
#: `spec_type` when applying the `specifiable` decorator to an existing class.
_KNOWN_SPECIFIABLE = collections.defaultdict(dict)

T = TypeVar('T', bound=type)
BUILTIN_TYPES_IN_SPEC = (int, float, complex, str, bytes, bytearray)


def _class_to_subspace(cls: Type) -> str:
  """
  Search the class hierarchy to find the subspace: the closest ancestor class in
  the class's method resolution order (MRO) whose name is found in the accepted
  subspace list. This is usually called when registering a new specifiable
  class.
  """
  if hasattr(cls, "mro"):
    # some classes do not have "mro", such as functions.
    for c in cls.mro():
      if c.__name__ in _ACCEPTED_SUBSPACES:
        return c.__name__

  return _FALLBACK_SUBSPACE


def _spec_type_to_subspace(spec_type: str) -> str:
  """
  Look for the subspace for a spec type. This is usually called to retrieve
  the subspace of a registered specifiable class.
  """
  for subspace in _ACCEPTED_SUBSPACES:
    if spec_type in _KNOWN_SPECIFIABLE[subspace]:
      return subspace

  raise ValueError(f"subspace for {spec_type} not found.")


[docs] @dataclasses.dataclass(frozen=True) class Spec(): """ Dataclass for storing specifications of specifiable objects. Objects can be initialized using the data in their corresponding spec. """ #: A string indicating the concrete `Specifiable` class type: str #: An optional dictionary of keyword arguments for the `__init__` method of #: the class. If None, when we materialize this Spec, we only return the #: class without instantiate any objects from it. config: Optional[Dict[str, Any]] = dataclasses.field(default_factory=dict)
def _specifiable_from_spec_helper(v, _run_init): if isinstance(v, Spec): return Specifiable.from_spec(v, _run_init) if isinstance(v, List): return [_specifiable_from_spec_helper(e, _run_init) for e in v] # TODO: support spec treatment for more types if not isinstance(v, BUILTIN_TYPES_IN_SPEC): logging.warning( "Type %s is not a recognized supported type for the" "specification. It will be included without conversion.", str(type(v))) return v def _specifiable_to_spec_helper(v): if isinstance(v, Specifiable): return v.to_spec() if isinstance(v, List): return [_specifiable_to_spec_helper(e) for e in v] if inspect.isfunction(v): if not hasattr(v, "spec_type"): _register(v, inject_spec_type=False) return Spec(type=_get_default_spec_type(v), config=None) if inspect.isclass(v): if not hasattr(v, "spec_type"): _register(v, inject_spec_type=False) return Spec(type=_get_default_spec_type(v), config=None) # TODO: support spec treatment for more types if not isinstance(v, BUILTIN_TYPES_IN_SPEC): logging.warning( "Type %s is not a recognized supported type for the" "specification. It will be included without conversion.", str(type(v))) return v
[docs] @runtime_checkable class Specifiable(Protocol): """Protocol that a specifiable class needs to implement."""
[docs] @classmethod def spec_type(cls) -> str: pass
[docs] @classmethod def from_spec(cls, spec: Spec, _run_init: bool = True) -> Union[Self, type[Self]]: """Generate a `Specifiable` subclass object based on a spec. Args: spec: the specification of a `Specifiable` subclass object _run_init: whether to call `__init__` or not for the initial instantiation Returns: Self: the `Specifiable` subclass object """ if spec.type is None: raise ValueError(f"Spec type not found in {spec}") subspace = _spec_type_to_subspace(spec.type) subclass: Type[Self] = _KNOWN_SPECIFIABLE[subspace].get(spec.type, None) if subclass is None: raise ValueError(f"Unknown spec type '{spec.type}' in {spec}") if spec.config is None: # when functions or classes are used as arguments, we won't try to # create an instance. return subclass kwargs = { k: _specifiable_from_spec_helper(v, _run_init) for k, v in spec.config.items() } if _run_init: kwargs["_run_init"] = True return subclass(**kwargs)
[docs] def to_spec(self) -> Spec: """Generate a spec from a `Specifiable` subclass object. Returns: Spec: The specification of the instance. """ if getattr(type(self), 'spec_type', None) is None: raise ValueError( f"'{type(self).__name__}' not registered as Specifiable. " f"Decorate ({type(self).__name__}) with @specifiable") args = { k: _specifiable_to_spec_helper(v) for k, v in self.init_kwargs.items() } return Spec(type=self.spec_type(), config=args)
[docs] def run_original_init(self) -> None: """Invoke the original __init__ method with original keyword arguments""" pass
[docs] @classmethod def unspecifiable(cls) -> None: """Resume the class structure prior to specifiable""" pass
def _get_default_spec_type(cls): spec_type = cls.__name__ if inspect.isfunction(cls) and cls.__name__ == "<lambda>": # for lambda functions, we need to include more information to distinguish # among them spec_type = '<lambda at %s:%s>' % ( os.path.basename(cls.__code__.co_filename), cls.__code__.co_firstlineno) return spec_type # Register a `Specifiable` subclass in `KNOWN_SPECIFIABLE` def _register(cls: type, spec_type=None, inject_spec_type=True) -> None: assert spec_type is None or inject_spec_type, \ "need to inject spec_type to class if spec_type is not None" if spec_type is None: # Use default spec_type for a class if users do not specify one. spec_type = _get_default_spec_type(cls) subspace = _class_to_subspace(cls) if spec_type in _KNOWN_SPECIFIABLE[subspace]: if cls is not _KNOWN_SPECIFIABLE[subspace][spec_type]: # only raise exception if we register the same spec type with a different # class raise ValueError( f"{spec_type} is already registered for " f"specifiable class {_KNOWN_SPECIFIABLE[subspace][spec_type]}. " "Please specify a different spec_type by @specifiable(spec_type=...)." ) else: _KNOWN_SPECIFIABLE[subspace][spec_type] = cls if inject_spec_type: setattr(cls, cls.__name__ + '__spec_type', spec_type) # cls.__spec_type = spec_type # Keep a copy of arguments that are used to call the `__init__` method when the # object is initialized. def _get_init_kwargs(inst, init_method, *args, **kwargs): params = dict( zip(inspect.signature(init_method).parameters.keys(), (None, ) + args)) del params['self'] params.update(**kwargs) return params @overload def specifiable( my_cls: None = None, /, *, spec_type: Optional[str] = None, on_demand_init: bool = True, just_in_time_init: bool = True) -> Callable[[T], T]: pass @overload def specifiable( my_cls: T, /, *, spec_type: Optional[str] = None, on_demand_init: bool = True, just_in_time_init: bool = True) -> T: pass
[docs] def specifiable( my_cls: Optional[T] = None, /, *, spec_type: Optional[str] = None, on_demand_init: bool = True, just_in_time_init: bool = True) -> Union[T, Callable[[T], T]]: """A decorator that turns a class into a `Specifiable` subclass by implementing the `Specifiable` protocol. To use the decorator, simply place `@specifiable` before the class definition:: @specifiable class Foo(): ... For finer control, the decorator can accept arguments:: @specifiable(spec_type="My Class", on_demand_init=False) class Bar(): ... Args: spec_type: The value of the `type` field in the Spec of a `Specifiable` subclass. If not provided, the class name is used. This argument is useful when registering multiple classes with the same base name; in such cases, one can specify `spec_type` to different values to resolve conflict. on_demand_init: If True, allow on-demand object initialization. The original `__init__` method will be called when `_run_init=True` is passed to the object's initialization function. just_in_time_init: If True, allow just-in-time object initialization. The original `__init__` method will be called when the first time an attribute is accessed. """ def _wrapper(cls: T) -> T: def new_init(self, *args, **kwargs): self._initialized = False self._in_init = False run_init_request = False if "_run_init" in kwargs: run_init_request = kwargs["_run_init"] del kwargs["_run_init"] if 'init_kwargs' not in self.__dict__: # If it is a child specifiable (i.e.g init_kwargs not set), we determine # whether to skip the original __init__ call based on options: # on_demand_init, just_in_time_init and _run_init. # Otherwise (i.e. init_kwargs is set), we always call the original # __init__ method for ancestor specifiable. self.init_kwargs = _get_init_kwargs( self, original_init, *args, **kwargs) logging.debug("Record init params in %s.new_init", class_name) if (on_demand_init and not run_init_request) or \ (not on_demand_init and just_in_time_init): logging.debug("Skip original %s.__init__", class_name) return logging.debug("Call original %s.__init__ in new_init", class_name) original_init(self, *args, **kwargs) self._initialized = True def run_original_init(self) -> None: """Execute the original `__init__` method with its saved arguments. For instances of the `Specifiable` class, initialization is deferred (lazy initialization). This function forces the execution of the original `__init__` method using the arguments captured during the object's initial instantiation. """ self._in_init = True original_init(self, **self.init_kwargs) self._in_init = False self._initialized = True # __getattr__ is only called when an attribute is not found in the object def new_getattr(self, name): logging.debug( "Trying to access %s.%s, but it is not found.", class_name, name) # Fix the infinite loop issue when pickling a Specifiable if name in ["_in_init", "__getstate__"] and name not in self.__dict__: raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'") # If the attribute is not found during or after initialization, then # it is a missing attribute. if self._in_init or self._initialized: raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'") # Here, we know the object is not initialized, then we will call original # init method. logging.debug("Call original %s.__init__ in new_getattr", class_name) run_original_init(self) # __getattribute__ is call for every attribute regardless whether it is # present in the object. In this case, we don't cause an infinite loop # if the attribute does not exist. logging.debug( "Call original %s.__getattribute__(%s) in new_getattr", class_name, name) return self.__getattribute__(name) def spec_type_func(cls): return getattr(cls, spec_type_attr_name) def unspecifiable(cls): delattr(cls, spec_type_attr_name) cls.__init__ = original_init if just_in_time_init: delattr(cls, '__getattr__') delattr(cls, 'spec_type') delattr(cls, 'run_original_init') delattr(cls, 'to_spec') delattr(cls, 'from_spec') delattr(cls, 'unspecifiable') spec_type_attr_name = cls.__name__ + "__spec_type" # the class is registered if hasattr(cls, spec_type_attr_name): return cls # start of the function body of _wrapper _register(cls, spec_type) class_name = cls.__name__ original_init = cls.__init__ # type: ignore[misc] cls.__init__ = new_init # type: ignore[misc] if just_in_time_init: cls.__getattr__ = new_getattr cls.spec_type = classmethod(spec_type_func) cls.run_original_init = run_original_init cls.to_spec = Specifiable.to_spec cls.from_spec = Specifiable.from_spec cls.unspecifiable = classmethod(unspecifiable) return cls # end of the function body of _wrapper # When this decorator is called with arguments, i.e.. # "@specifiable(arg1=...,arg2=...)", it is equivalent to assigning # specifiable(arg1=..., arg2=...) to a variable, say decor_func, and then # calling "@decor_func". if my_cls is None: return _wrapper # When this decorator is called without an argument, i.e. "@specifiable", # we return the augmented class. return _wrapper(my_cls)