#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Trackers for calculating quantiles in windowed fashion.
This module defines different types of quantile trackers that operate on
windows of data. It includes:
* `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding
window.
* `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in
landmark window mode.
* `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in
sliding window mode.
"""
import math
import typing
import warnings
import numpy as np
from sortedcontainers import SortedList
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.anomaly.univariate.base import BaseTracker
from apache_beam.ml.anomaly.univariate.base import WindowedTracker
from apache_beam.ml.anomaly.univariate.base import WindowMode
[docs]
class QuantileTracker(BaseTracker):
"""Abstract base class for quantile trackers.
Currently, it does not add any specific functionality but provides a type
hierarchy for quantile trackers.
"""
def __init__(self, q):
assert 0 <= q <= 1, "quantile argument should be between 0 and 1"
self._q = q
[docs]
@specifiable
class SimpleSlidingQuantileTracker(WindowedTracker, QuantileTracker):
"""Sliding window quantile tracker using NumPy.
This tracker uses NumPy's `nanquantile` function to calculate the specified
quantile of the values currently in the sliding window. It's a simple,
non-incremental approach.
Args:
window_size: The size of the sliding window.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, window_size, q):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
QuantileTracker.__init__(self, q)
[docs]
def get(self):
"""Calculates and returns the specified quantile of the current sliding
window.
Returns:
float: The specified quantile of the values in the current sliding window.
Returns NaN if the window is empty.
"""
with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
return np.nanquantile(self._queue, self._q)
[docs]
class BufferedQuantileTracker(WindowedTracker, QuantileTracker):
"""Abstract base class for buffered quantile trackers.
Warning:
Buffered quantile trackers are NOT truly incremental in the sense that they
don't update the quantile in constant time per new data point. They maintain
a sorted list of all values in the window.
Args:
window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK`
or `SLIDING`.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
**kwargs: Keyword arguments passed to the parent class constructor.
"""
def __init__(self, window_mode, q, **kwargs):
super().__init__(window_mode, **kwargs)
QuantileTracker.__init__(self, q)
self._sorted_items = SortedList()
[docs]
def push(self, x):
"""Pushes a new value, maintains the sorted list, and manages the window.
Args:
x: The new value to be pushed.
"""
if not math.isnan(x):
self._sorted_items.add(x)
if self._window_mode == WindowMode.SLIDING:
if (len(self._queue) >= self._window_size and
not math.isnan(old_x := self.pop())):
self._sorted_items.discard(old_x)
super().push(x)
@staticmethod
def _get_helper(sorted_items, q):
n = len(sorted_items)
if n < 1:
return float("nan")
pos = q * (n - 1)
lo = math.floor(pos)
lo_value = typing.cast(float, sorted_items[lo])
# Use linear interpolation to yield the requested quantile
hi = min(lo + 1, n - 1)
hi_value: float = typing.cast(float, sorted_items[hi])
return lo_value + (hi_value - lo_value) * (pos - lo)
[docs]
def get(self):
"""Returns the current quantile value using the sorted list.
Calculates the quantile using linear interpolation on the sorted values.
Returns:
float: The calculated quantile value. Returns NaN if the window is empty.
"""
return self._get_helper(self._sorted_items, self._q)
[docs]
@specifiable
class SecondaryBufferedQuantileTracker(WindowedTracker, QuantileTracker):
"""A secondary quantile tracker that shares its data with a master tracker.
This tracker acts as a read-only view of the master tracker's data, providing
quantile calculations without maintaining its own independent buffer. It
relies on the master's sorted items for quantile estimations.
Args:
master: The BufferedQuantileTracker instance to share data with.
q: A list of quantiles to track.
"""
def __init__(self, master: QuantileTracker, q):
assert isinstance(master, BufferedQuantileTracker), \
"Cannot create secondary tracker from non-BufferedQuantileTracker"
self._master = master
super().__init__(self._master._window_mode)
QuantileTracker.__init__(self, q)
self._sorted_items = self._master._sorted_items
[docs]
def push(self, x):
"""Does nothing, as this is a secondary tracker.
"""
pass
[docs]
def get(self):
"""Returns the calculated quantiles based on the master tracker's buffer.
Returns:
A list of calculated quantiles.
"""
return self._master._get_helper(self._master._sorted_items, self._q)
[docs]
@specifiable
class BufferedLandmarkQuantileTracker(BufferedQuantileTracker):
"""Landmark quantile tracker using a sorted list for quantile calculation.
Warning:
Landmark quantile trackers have unbounded memory consumption as they store
all pushed values in a sorted list. Avoid using in production for
long-running streams.
Args:
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, q):
warnings.warn(
"Quantile trackers should not be used in production due to "
"the unbounded memory consumption.")
super().__init__(window_mode=WindowMode.LANDMARK, q=q)
[docs]
@specifiable
class BufferedSlidingQuantileTracker(BufferedQuantileTracker):
"""Sliding window quantile tracker using a sorted list for quantile
calculation.
Warning:
Maintains a sorted list of values within the sliding window to calculate
the specified quantile. Memory consumption is bounded by the window size
but can still be significant for large windows.
Args:
window_size: The size of the sliding window.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, window_size, q):
super().__init__(
window_mode=WindowMode.SLIDING, q=q, window_size=window_size)