Source code for apache_beam.ml.anomaly.aggregations

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import collections
import math
import statistics
from typing import Any
from typing import Callable
from typing import Iterable
from typing import Optional

from apache_beam.ml.anomaly.base import DEFAULT_MISSING_LABEL
from apache_beam.ml.anomaly.base import DEFAULT_NORMAL_LABEL
from apache_beam.ml.anomaly.base import DEFAULT_OUTLIER_LABEL
from apache_beam.ml.anomaly.base import AggregationFn
from apache_beam.ml.anomaly.base import AnomalyPrediction
from apache_beam.ml.anomaly.specifiable import specifiable


class _AggModelIdMixin:
  def __init__(self, agg_model_id: Optional[str] = None):
    self._agg_model_id = agg_model_id

  def _set_agg_model_id_if_unset(self, agg_model_id: str) -> None:
    if self._agg_model_id is None:
      self._agg_model_id = agg_model_id

  def add_model_id(self, result_dict):
    result_dict["model_id"] = self._agg_model_id


class _SourcePredictionMixin:
  def __init__(self, include_source_predictions):
    self._include_source_predictions = include_source_predictions

  def add_source_predictions(self, result_dict, source_predictions):
    if self._include_source_predictions:
      result_dict["source_predictions"] = list(source_predictions)


[docs] class LabelAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin): """Aggregates anomaly predictions based on their labels. This is an abstract base class for `AggregationFn`s that combine multiple `AnomalyPrediction` objects into a single `AnomalyPrediction` based on the labels of the input predictions. Args: agg_func (Callable[[Iterable[int]], int]): A function that aggregates a collection of anomaly labels (integers) into a single label. agg_model_id (Optional[str]): The model id used in aggregated predictions. Defaults to None. include_source_predictions (bool): If True, include the input predictions in the `source_predictions` of the output. Defaults to False. """ def __init__( self, agg_func: Callable[[Iterable[int]], int], agg_model_id: Optional[str] = None, include_source_predictions: bool = False, normal_label: int = DEFAULT_NORMAL_LABEL, outlier_label: int = DEFAULT_OUTLIER_LABEL, missing_label: int = DEFAULT_MISSING_LABEL, ): self._agg = agg_func self._normal_label = normal_label self._outlier_label = outlier_label self._missing_label = missing_label _AggModelIdMixin.__init__(self, agg_model_id) _SourcePredictionMixin.__init__(self, include_source_predictions)
[docs] def apply( self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: """Applies the label aggregation function to a list of predictions. Args: predictions (Iterable[AnomalyPrediction]): A collection of `AnomalyPrediction` objects to be aggregated. Returns: AnomalyPrediction: A single `AnomalyPrediction` object with the aggregated label. The aggregated label is determined as follows: - If there are any non-missing and non-error labels, the `agg_func` is applied to aggregate them. - If all labels are error labels (`None`), the aggregated label is also `None`. - If there are a mix of missing and error labels, the aggregated label is the `missing_label`. """ result_dict: dict[str, Any] = {} _AggModelIdMixin.add_model_id(self, result_dict) _SourcePredictionMixin.add_source_predictions( self, result_dict, predictions) labels = [ prediction.label for prediction in predictions if prediction.label is not None and prediction.label != self._missing_label ] if len(labels) > 0: # apply aggregation_fn if there is any non-None and non-missing label result_dict["label"] = self._agg(labels) elif all(map(lambda x: x.label is None, predictions)): # all are error labels (None) -- all scores are error result_dict["label"] = None else: # some missing labels with some error labels (None) result_dict["label"] = self._missing_label return AnomalyPrediction(**result_dict)
[docs] class ScoreAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin): """Aggregates anomaly predictions based on their scores. This is an abstract base class for `AggregationFn`s that combine multiple `AnomalyPrediction` objects into a single `AnomalyPrediction` based on the scores of the input predictions. Args: agg_func (Callable[[Iterable[float]], float]): A function that aggregates a collection of anomaly scores (floats) into a single score. agg_model_id (Optional[str]): The model id used in aggregated predictions. Defaults to None. include_source_predictions (bool): If True, include the input predictions in the `source_predictions` of the output. Defaults to False. """ def __init__( self, agg_func: Callable[[Iterable[float]], float], agg_model_id: Optional[str] = None, include_source_predictions: bool = False): self._agg = agg_func _AggModelIdMixin.__init__(self, agg_model_id) _SourcePredictionMixin.__init__(self, include_source_predictions)
[docs] def apply( self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: """Applies the score aggregation function to a list of predictions. Args: predictions (Iterable[AnomalyPrediction]): A collection of `AnomalyPrediction` objects to be aggregated. Returns: AnomalyPrediction: A single `AnomalyPrediction` object with the aggregated score. The aggregated score is determined as follows: - If there are any non-missing and non-error scores, the `agg_func` is applied to aggregate them. - If all scores are error scores (`None`), the aggregated score is also `None`. - If there are a mix of missing (`NaN`) and error scores (`None`), the aggregated score is `NaN`. """ result_dict: dict[str, Any] = {} _AggModelIdMixin.add_model_id(self, result_dict) _SourcePredictionMixin.add_source_predictions( self, result_dict, predictions) scores = [ prediction.score for prediction in predictions if prediction.score is not None and not math.isnan(prediction.score) ] if len(scores) > 0: # apply aggregation_fn if there is any non-None and non-NaN score result_dict["score"] = self._agg(scores) elif all(map(lambda x: x.score is None, predictions)): # all are error scores (None) result_dict["score"] = None else: # some missing scores (NaN) with some error scores (None) result_dict["score"] = float("NaN") return AnomalyPrediction(**result_dict)
[docs] @specifiable class MajorityVote(LabelAggregation): """Aggregates anomaly labels using majority voting. This `AggregationFn` implements a majority voting strategy to combine anomaly labels from multiple `AnomalyPrediction` objects. It counts the occurrences of normal and outlier labels and selects the label with the higher count as the aggregated label. In case of a tie, a tie-breaker label is used. Example: If input labels are [normal, outlier, outlier, normal, outlier], and normal_label=0, outlier_label=1, then the aggregated label will be outlier (1) because outliers have a majority (3 vs 2). Args: normal_label (int): The integer label for normal predictions. Defaults to 0. outlier_label (int): The integer label for outlier predictions. Defaults to 1. tie_breaker (int): The label to return if there is a tie in votes. Defaults to 0 (normal_label). **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ def __init__(self, tie_breaker=DEFAULT_NORMAL_LABEL, **kwargs): self._tie_breaker = tie_breaker def inner(predictions: Iterable[int]) -> int: counters = collections.Counter(predictions) if counters[self._normal_label] < counters[self._outlier_label]: vote = self._outlier_label elif counters[self._normal_label] > counters[self._outlier_label]: vote = self._normal_label else: vote = self._tie_breaker return vote super().__init__(agg_func=inner, **kwargs)
# And scheme
[docs] @specifiable class AllVote(LabelAggregation): """Aggregates anomaly labels using an "all vote" (AND) scheme. This `AggregationFn` implements an "all vote" strategy. It aggregates anomaly labels such that the result is considered an outlier only if all input `AnomalyPrediction` objects are labeled as outliers. Example: If input labels are [outlier, outlier, outlier], and outlier_label=1, then the aggregated label will be outlier (1). If input labels are [outlier, normal, outlier], and outlier_label=1, then the aggregated label will be normal (0). Args: normal_label (int): The integer label for normal predictions. Defaults to 0. outlier_label (int): The integer label for outlier predictions. Defaults to 1. **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ def __init__(self, **kwargs): def inner(predictions: Iterable[int]) -> int: return self._outlier_label if all( map(lambda p: p == self._outlier_label, predictions)) else self._normal_label super().__init__(agg_func=inner, **kwargs)
# Or scheme
[docs] @specifiable class AnyVote(LabelAggregation): """Aggregates anomaly labels using an "any vote" (OR) scheme. This `AggregationFn` implements an "any vote" strategy. It aggregates anomaly labels such that the result is considered an outlier if at least one of the input `AnomalyPrediction` objects is labeled as an outlier. Example: If input labels are [normal, normal, outlier], and outlier_label=1, then the aggregated label will be outlier (1). If input labels are [normal, normal, normal], and outlier_label=1, then the aggregated label will be normal (0). Args: normal_label (int): The integer label for normal predictions. Defaults to 0. outlier_label (int): The integer label for outlier predictions. Defaults to 1. **kwargs: Additional keyword arguments to pass to the base `LabelAggregation` class. """ def __init__(self, **kwargs): def inner(predictions: Iterable[int]) -> int: return self._outlier_label if any( map(lambda p: p == self._outlier_label, predictions)) else self._normal_label super().__init__(agg_func=inner, **kwargs)
[docs] @specifiable class AverageScore(ScoreAggregation): """Aggregates anomaly scores by calculating their average. This `AggregationFn` computes the average of the anomaly scores from a collection of `AnomalyPrediction` objects. Args: **kwargs: Additional keyword arguments to pass to the base `ScoreAggregation` class. """ def __init__(self, **kwargs): super().__init__(agg_func=statistics.mean, **kwargs)
[docs] @specifiable class MaxScore(ScoreAggregation): """Aggregates anomaly scores by selecting the maximum score. This `AggregationFn` selects the highest anomaly score from a collection of `AnomalyPrediction` objects as the aggregated score. Args: **kwargs: Additional keyword arguments to pass to the base `ScoreAggregation` class. """ def __init__(self, **kwargs): super().__init__(agg_func=max, **kwargs)