Source code for apache_beam.ml.anomaly.univariate.base

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
from collections import deque
from enum import Enum

EPSILON = 1e-9


[docs] class BaseTracker(abc.ABC): """Abstract base class for all univariate trackers."""
[docs] @abc.abstractmethod def push(self, x): """Push a new value to the tracker. Args: x: The value to be pushed. """ raise NotImplementedError()
[docs] @abc.abstractmethod def get(self): """Get the current tracking value. Returns: The current tracked value, the type of which depends on the specific tracker implementation. """ raise NotImplementedError()
[docs] class WindowMode(Enum): """Enum representing the window mode for windowed trackers.""" #: operating on all data points from the beginning. LANDMARK = 1 #: operating on a fixed-size sliding window of recent data points. SLIDING = 2
[docs] class WindowedTracker(BaseTracker): """Abstract base class for trackers that operate on a data window. This class provides a foundation for trackers that maintain a window of data, either as a landmark window or a sliding window. It provides basic push and pop operations. Args: window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` or `SLIDING`. **kwargs: Keyword arguments. For `SLIDING` window mode, `window_size` can be specified to set the maximum size of the sliding window. Defaults to 100. """ def __init__(self, window_mode, **kwargs): if window_mode == WindowMode.SLIDING: self._window_size = kwargs.get("window_size", 100) self._queue = deque(maxlen=self._window_size) self._n = 0 self._window_mode = window_mode
[docs] def push(self, x): """Adds a new value to the data window. Args: x: The value to be added to the window. """ self._queue.append(x)
[docs] def pop(self): """Removes and returns the oldest value from the data window (FIFO). Returns: The oldest value from the window. """ return self._queue.popleft()