Source code for apache_beam.ml.anomaly.detectors.zscore

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
from typing import Optional

import apache_beam as beam
from apache_beam.ml.anomaly.base import AnomalyDetector
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.anomaly.thresholds import FixedThreshold
from apache_beam.ml.anomaly.univariate.base import EPSILON
from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker
from apache_beam.ml.anomaly.univariate.mean import MeanTracker
from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker
from apache_beam.ml.anomaly.univariate.stdev import StdevTracker

DEFAULT_WINDOW_SIZE = 1000


# pylint: disable=line-too-long
[docs] @specifiable class ZScore(AnomalyDetector): """Z-Score anomaly detector. This class implements an anomaly detection algorithm based on Z-Score (also known as Standard Score [#]_ ), which measures how many standard deviations a data point is from the mean. The score is calculated as: `| (value - mean) / stdev |` Important: In the streaming setting, we use the online version of mean and standard deviation in the calculation. This implementation is adapted from the implementations within PySAD [#]_ and River [#]_: * https://github.com/selimfirat/pysad/blob/master/pysad/models/standard_absolute_deviation.py * https://github.com/online-ml/river/blob/main/river/anomaly/sad.py Args: sub_stat_tracker: Optional `MeanTracker` instance. If None, an `IncSlidingMeanTracker` with a default window size 1000 is created. stdev_tracker: Optional `StdevTracker` instance. If None, an `IncSlidingStdevTracker` with a default window size 1000 is created. threshold_criterion: threshold_criterion: Optional `ThresholdFn` to apply on the score. Defaults to `FixedThreshold(3)` due to the commonly used 3-sigma rule. **kwargs: Additional keyword arguments. .. [#] https://en.wikipedia.org/wiki/Standard_score .. [#] Yilmaz, Selim & Kozat, Suleyman. (2020). PySAD: A Streaming Anomaly Detection Framework in Python. 10.48550/arXiv.2009.02572. .. [#] Jacob Montiel, Max Halford, Saulo Martiello Mastelini, Geoffrey Bolmier, Raphaƫl Sourty, et al.. (2021). River: machine learning for streaming data in Python. Journal of Machine Learning Research, 2021, 22, pp.1-8. """ # pylint: enable=line-too-long def __init__( self, sub_stat_tracker: Optional[MeanTracker] = None, stdev_tracker: Optional[StdevTracker] = None, **kwargs): if "threshold_criterion" not in kwargs: kwargs["threshold_criterion"] = FixedThreshold(3) super().__init__(**kwargs) self._sub_stat_tracker = sub_stat_tracker or IncSlidingMeanTracker( DEFAULT_WINDOW_SIZE) self._stdev_tracker = stdev_tracker or IncSlidingStdevTracker( DEFAULT_WINDOW_SIZE)
[docs] def learn_one(self, x: beam.Row) -> None: """Updates the mean and standard deviation trackers with a new data point. Args: x: A `beam.Row` containing a single numerical value. """ if len(x.__dict__) != 1: raise ValueError( "ZScore.learn_one expected univariate input, but got %s", str(x)) v = next(iter(x)) self._stdev_tracker.push(v) self._sub_stat_tracker.push(v)
[docs] def score_one(self, x: beam.Row) -> Optional[float]: """Scores a data point using the Z-Score. Args: x: A `beam.Row` containing a single numerical value. Returns: float | None: The Z-Score. """ if len(x.__dict__) != 1: raise ValueError( "ZScore.score_one expected univariate input, but got %s", str(x)) v = next(iter(x)) if v is None or math.isnan(v): return None sub_stat = self._sub_stat_tracker.get() stdev = self._stdev_tracker.get() # not enough data points to compute sub_stat or standard deviation if math.isnan(stdev) or math.isnan(sub_stat): return float('NaN') if abs(stdev) < EPSILON: return 0.0 return abs((v - sub_stat) / stdev)