#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A module that provides utilities to turn a class into a Specifiable subclass.
"""
from __future__ import annotations
import collections
import dataclasses
import inspect
import logging
import os
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Protocol
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from typing import runtime_checkable
from typing_extensions import Self
__all__ = ["Spec", "Specifiable", "specifiable"]
_FALLBACK_SUBSPACE = "*"
_ACCEPTED_SUBSPACES = [
"EnsembleAnomalyDetector",
"AnomalyDetector",
"BaseTracker",
"ThresholdFn",
"AggregationFn",
_FALLBACK_SUBSPACE,
]
#: A nested dictionary for efficient lookup of Specifiable subclasses.
#: Structure: `_KNOWN_SPECIFIABLE[subspace][spec_type]`, where `subspace` is one
#: of the accepted subspaces that the class belongs to and `spec_type` is the
#: class name by default. Users can also specify a different value for
#: `spec_type` when applying the `specifiable` decorator to an existing class.
_KNOWN_SPECIFIABLE = collections.defaultdict(dict)
T = TypeVar('T', bound=type)
BUILTIN_TYPES_IN_SPEC = (int, float, complex, str, bytes, bytearray)
def _class_to_subspace(cls: Type) -> str:
"""
Search the class hierarchy to find the subspace: the closest ancestor class in
the class's method resolution order (MRO) whose name is found in the accepted
subspace list. This is usually called when registering a new specifiable
class.
"""
if hasattr(cls, "mro"):
# some classes do not have "mro", such as functions.
for c in cls.mro():
if c.__name__ in _ACCEPTED_SUBSPACES:
return c.__name__
return _FALLBACK_SUBSPACE
def _spec_type_to_subspace(spec_type: str) -> str:
"""
Look for the subspace for a spec type. This is usually called to retrieve
the subspace of a registered specifiable class.
"""
for subspace in _ACCEPTED_SUBSPACES:
if spec_type in _KNOWN_SPECIFIABLE[subspace]:
return subspace
raise ValueError(f"subspace for {spec_type} not found.")
[docs]
@dataclasses.dataclass(frozen=True)
class Spec():
"""
Dataclass for storing specifications of specifiable objects.
Objects can be initialized using the data in their corresponding spec.
"""
#: A string indicating the concrete `Specifiable` class
type: str
#: An optional dictionary of keyword arguments for the `__init__` method of
#: the class. If None, when we materialize this Spec, we only return the
#: class without instantiate any objects from it.
config: Optional[Dict[str, Any]] = dataclasses.field(default_factory=dict)
def _specifiable_from_spec_helper(v, _run_init):
if isinstance(v, Spec):
return Specifiable.from_spec(v, _run_init)
if isinstance(v, List):
return [_specifiable_from_spec_helper(e, _run_init) for e in v]
# TODO: support spec treatment for more types
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
logging.warning(
"Type %s is not a recognized supported type for the"
"specification. It will be included without conversion.",
str(type(v)))
return v
def _specifiable_to_spec_helper(v):
if isinstance(v, Specifiable):
return v.to_spec()
if isinstance(v, List):
return [_specifiable_to_spec_helper(e) for e in v]
if inspect.isfunction(v):
if not hasattr(v, "spec_type"):
_register(v, inject_spec_type=False)
return Spec(type=_get_default_spec_type(v), config=None)
if inspect.isclass(v):
if not hasattr(v, "spec_type"):
_register(v, inject_spec_type=False)
return Spec(type=_get_default_spec_type(v), config=None)
# TODO: support spec treatment for more types
if not isinstance(v, BUILTIN_TYPES_IN_SPEC):
logging.warning(
"Type %s is not a recognized supported type for the"
"specification. It will be included without conversion.",
str(type(v)))
return v
[docs]
@runtime_checkable
class Specifiable(Protocol):
"""Protocol that a specifiable class needs to implement."""
[docs]
@classmethod
def spec_type(cls) -> str:
pass
[docs]
@classmethod
def from_spec(cls,
spec: Spec,
_run_init: bool = True) -> Union[Self, type[Self]]:
"""Generate a `Specifiable` subclass object based on a spec.
Args:
spec: the specification of a `Specifiable` subclass object
_run_init: whether to call `__init__` or not for the initial instantiation
Returns:
Self: the `Specifiable` subclass object
"""
if spec.type is None:
raise ValueError(f"Spec type not found in {spec}")
subspace = _spec_type_to_subspace(spec.type)
subclass: Type[Self] = _KNOWN_SPECIFIABLE[subspace].get(spec.type, None)
if subclass is None:
raise ValueError(f"Unknown spec type '{spec.type}' in {spec}")
if spec.config is None:
# when functions or classes are used as arguments, we won't try to
# create an instance.
return subclass
kwargs = {
k: _specifiable_from_spec_helper(v, _run_init)
for k,
v in spec.config.items()
}
if _run_init:
kwargs["_run_init"] = True
return subclass(**kwargs)
[docs]
def to_spec(self) -> Spec:
"""Generate a spec from a `Specifiable` subclass object.
Returns:
Spec: The specification of the instance.
"""
if getattr(type(self), 'spec_type', None) is None:
raise ValueError(
f"'{type(self).__name__}' not registered as Specifiable. "
f"Decorate ({type(self).__name__}) with @specifiable")
args = {
k: _specifiable_to_spec_helper(v)
for k, v in self.init_kwargs.items()
}
return Spec(type=self.spec_type(), config=args)
[docs]
def run_original_init(self) -> None:
"""Invoke the original __init__ method with original keyword arguments"""
pass
[docs]
@classmethod
def unspecifiable(cls) -> None:
"""Resume the class structure prior to specifiable"""
pass
def _get_default_spec_type(cls):
spec_type = cls.__name__
if inspect.isfunction(cls) and cls.__name__ == "<lambda>":
# for lambda functions, we need to include more information to distinguish
# among them
spec_type = '<lambda at %s:%s>' % (
os.path.basename(cls.__code__.co_filename), cls.__code__.co_firstlineno)
return spec_type
# Register a `Specifiable` subclass in `KNOWN_SPECIFIABLE`
def _register(cls: type, spec_type=None, inject_spec_type=True) -> None:
assert spec_type is None or inject_spec_type, \
"need to inject spec_type to class if spec_type is not None"
if spec_type is None:
# Use default spec_type for a class if users do not specify one.
spec_type = _get_default_spec_type(cls)
subspace = _class_to_subspace(cls)
if spec_type in _KNOWN_SPECIFIABLE[subspace]:
if cls is not _KNOWN_SPECIFIABLE[subspace][spec_type]:
# only raise exception if we register the same spec type with a different
# class
raise ValueError(
f"{spec_type} is already registered for "
f"specifiable class {_KNOWN_SPECIFIABLE[subspace][spec_type]}. "
"Please specify a different spec_type by @specifiable(spec_type=...)."
)
else:
_KNOWN_SPECIFIABLE[subspace][spec_type] = cls
if inject_spec_type:
setattr(cls, cls.__name__ + '__spec_type', spec_type)
# cls.__spec_type = spec_type
# Keep a copy of arguments that are used to call the `__init__` method when the
# object is initialized.
def _get_init_kwargs(inst, init_method, *args, **kwargs):
params = dict(
zip(inspect.signature(init_method).parameters.keys(), (None, ) + args))
del params['self']
params.update(**kwargs)
return params
@overload
def specifiable(
my_cls: None = None,
/,
*,
spec_type: Optional[str] = None,
on_demand_init: bool = True,
just_in_time_init: bool = True) -> Callable[[T], T]:
pass
@overload
def specifiable(
my_cls: T,
/,
*,
spec_type: Optional[str] = None,
on_demand_init: bool = True,
just_in_time_init: bool = True) -> T:
pass
[docs]
def specifiable(
my_cls: Optional[T] = None,
/,
*,
spec_type: Optional[str] = None,
on_demand_init: bool = True,
just_in_time_init: bool = True) -> Union[T, Callable[[T], T]]:
"""A decorator that turns a class into a `Specifiable` subclass by
implementing the `Specifiable` protocol.
To use the decorator, simply place `@specifiable` before the class
definition::
@specifiable
class Foo():
...
For finer control, the decorator can accept arguments::
@specifiable(spec_type="My Class", on_demand_init=False)
class Bar():
...
Args:
spec_type: The value of the `type` field in the Spec of a `Specifiable`
subclass. If not provided, the class name is used. This argument is useful
when registering multiple classes with the same base name; in such cases,
one can specify `spec_type` to different values to resolve conflict.
on_demand_init: If True, allow on-demand object initialization. The original
`__init__` method will be called when `_run_init=True` is passed to the
object's initialization function.
just_in_time_init: If True, allow just-in-time object initialization. The
original `__init__` method will be called when the first time an attribute
is accessed.
"""
def _wrapper(cls: T) -> T:
def new_init(self, *args, **kwargs):
self._initialized = False
self._in_init = False
run_init_request = False
if "_run_init" in kwargs:
run_init_request = kwargs["_run_init"]
del kwargs["_run_init"]
if 'init_kwargs' not in self.__dict__:
# If it is a child specifiable (i.e.g init_kwargs not set), we determine
# whether to skip the original __init__ call based on options:
# on_demand_init, just_in_time_init and _run_init.
# Otherwise (i.e. init_kwargs is set), we always call the original
# __init__ method for ancestor specifiable.
self.init_kwargs = _get_init_kwargs(
self, original_init, *args, **kwargs)
logging.debug("Record init params in %s.new_init", class_name)
if (on_demand_init and not run_init_request) or \
(not on_demand_init and just_in_time_init):
logging.debug("Skip original %s.__init__", class_name)
return
logging.debug("Call original %s.__init__ in new_init", class_name)
original_init(self, *args, **kwargs)
self._initialized = True
def run_original_init(self) -> None:
"""Execute the original `__init__` method with its saved arguments.
For instances of the `Specifiable` class, initialization is deferred
(lazy initialization). This function forces the execution of the
original `__init__` method using the arguments captured during
the object's initial instantiation.
"""
self._in_init = True
original_init(self, **self.init_kwargs)
self._in_init = False
self._initialized = True
# __getattr__ is only called when an attribute is not found in the object
def new_getattr(self, name):
logging.debug(
"Trying to access %s.%s, but it is not found.", class_name, name)
# Fix the infinite loop issue when pickling a Specifiable
if name in ["_in_init", "__getstate__"] and name not in self.__dict__:
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'")
# If the attribute is not found during or after initialization, then
# it is a missing attribute.
if self._in_init or self._initialized:
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'")
# Here, we know the object is not initialized, then we will call original
# init method.
logging.debug("Call original %s.__init__ in new_getattr", class_name)
run_original_init(self)
# __getattribute__ is call for every attribute regardless whether it is
# present in the object. In this case, we don't cause an infinite loop
# if the attribute does not exist.
logging.debug(
"Call original %s.__getattribute__(%s) in new_getattr",
class_name,
name)
return self.__getattribute__(name)
def spec_type_func(cls):
return getattr(cls, spec_type_attr_name)
def unspecifiable(cls):
delattr(cls, spec_type_attr_name)
cls.__init__ = original_init
if just_in_time_init:
delattr(cls, '__getattr__')
delattr(cls, 'spec_type')
delattr(cls, 'run_original_init')
delattr(cls, 'to_spec')
delattr(cls, 'from_spec')
delattr(cls, 'unspecifiable')
spec_type_attr_name = cls.__name__ + "__spec_type"
# the class is registered
if hasattr(cls, spec_type_attr_name):
return cls
# start of the function body of _wrapper
_register(cls, spec_type)
class_name = cls.__name__
original_init = cls.__init__ # type: ignore[misc]
cls.__init__ = new_init # type: ignore[misc]
if just_in_time_init:
cls.__getattr__ = new_getattr
cls.spec_type = classmethod(spec_type_func)
cls.run_original_init = run_original_init
cls.to_spec = Specifiable.to_spec
cls.from_spec = Specifiable.from_spec
cls.unspecifiable = classmethod(unspecifiable)
return cls
# end of the function body of _wrapper
# When this decorator is called with arguments, i.e..
# "@specifiable(arg1=...,arg2=...)", it is equivalent to assigning
# specifiable(arg1=..., arg2=...) to a variable, say decor_func, and then
# calling "@decor_func".
if my_cls is None:
return _wrapper
# When this decorator is called without an argument, i.e. "@specifiable",
# we return the augmented class.
return _wrapper(my_cls)