#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Trackers for calculating quantiles in windowed fashion.
This module defines different types of quantile trackers that operate on
windows of data. It includes:
  * `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding
    window.
  * `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in
    landmark window mode.
  * `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in
    sliding window mode.
"""
import math
import typing
import warnings
import numpy as np
from sortedcontainers import SortedList
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.anomaly.univariate.base import BaseTracker
from apache_beam.ml.anomaly.univariate.base import WindowedTracker
from apache_beam.ml.anomaly.univariate.base import WindowMode
[docs]
class QuantileTracker(BaseTracker):
  """Abstract base class for quantile trackers.
  Currently, it does not add any specific functionality but provides a type
  hierarchy for quantile trackers.
  """
  def __init__(self, q):
    assert 0 <= q <= 1, "quantile argument should be between 0 and 1"
    self._q = q 
[docs]
@specifiable
class SimpleSlidingQuantileTracker(WindowedTracker, QuantileTracker):
  """Sliding window quantile tracker using NumPy.
  This tracker uses NumPy's `nanquantile` function to calculate the specified
  quantile of the values currently in the sliding window. It's a simple,
  non-incremental approach.
  Args:
    window_size: The size of the sliding window.
    q: The quantile to calculate, a float between 0 and 1 (inclusive).
  """
  def __init__(self, window_size, q):
    super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
    QuantileTracker.__init__(self, q)
[docs]
  def get(self):
    """Calculates and returns the specified quantile of the current sliding
    window.
    Returns:
      float: The specified quantile of the values in the current sliding window.
             Returns NaN if the window is empty.
    """
    with warnings.catch_warnings(record=False):
      warnings.simplefilter("ignore")
      return np.nanquantile(self._queue, self._q) 
 
[docs]
class BufferedQuantileTracker(WindowedTracker, QuantileTracker):
  """Abstract base class for buffered quantile trackers.
  Warning:
    Buffered quantile trackers are NOT truly incremental in the sense that they
    don't update the quantile in constant time per new data point. They maintain
    a sorted list of all values in the window.
  Args:
    window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK`
      or `SLIDING`.
    q: The quantile to calculate, a float between 0 and 1 (inclusive).
    **kwargs: Keyword arguments passed to the parent class constructor.
  """
  def __init__(self, window_mode, q, **kwargs):
    super().__init__(window_mode, **kwargs)
    QuantileTracker.__init__(self, q)
    self._sorted_items = SortedList()
[docs]
  def push(self, x):
    """Pushes a new value, maintains the sorted list, and manages the window.
    Args:
      x: The new value to be pushed.
    """
    if not math.isnan(x):
      self._sorted_items.add(x)
    if self._window_mode == WindowMode.SLIDING:
      if (len(self._queue) >= self._window_size and
          not math.isnan(old_x := self.pop())):
        self._sorted_items.discard(old_x)
      super().push(x) 
  @staticmethod
  def _get_helper(sorted_items, q):
    n = len(sorted_items)
    if n < 1:
      return float("nan")
    pos = q * (n - 1)
    lo = math.floor(pos)
    lo_value = typing.cast(float, sorted_items[lo])
    # Use linear interpolation to yield the requested quantile
    hi = min(lo + 1, n - 1)
    hi_value: float = typing.cast(float, sorted_items[hi])
    return lo_value + (hi_value - lo_value) * (pos - lo)
[docs]
  def get(self):
    """Returns the current quantile value using the sorted list.
    Calculates the quantile using linear interpolation on the sorted values.
    Returns:
      float: The calculated quantile value. Returns NaN if the window is empty.
    """
    return self._get_helper(self._sorted_items, self._q) 
 
[docs]
@specifiable
class SecondaryBufferedQuantileTracker(WindowedTracker, QuantileTracker):
  """A secondary quantile tracker that shares its data with a master tracker.
  This tracker acts as a read-only view of the master tracker's data, providing
  quantile calculations without maintaining its own independent buffer. It
  relies on the master's sorted items for quantile estimations.
  Args:
    master: The BufferedQuantileTracker instance to share data with.
    q: A list of quantiles to track.
  """
  def __init__(self, master: QuantileTracker, q):
    assert isinstance(master, BufferedQuantileTracker), \
        
"Cannot create secondary tracker from non-BufferedQuantileTracker"
    self._master = master
    super().__init__(self._master._window_mode)
    QuantileTracker.__init__(self, q)
    self._sorted_items = self._master._sorted_items
[docs]
  def push(self, x):
    """Does nothing, as this is a secondary tracker.
    """
    pass 
[docs]
  def get(self):
    """Returns the calculated quantiles based on the master tracker's buffer.
    Returns:
        A list of calculated quantiles.
    """
    return self._master._get_helper(self._master._sorted_items, self._q) 
 
[docs]
@specifiable
class BufferedLandmarkQuantileTracker(BufferedQuantileTracker):
  """Landmark quantile tracker using a sorted list for quantile calculation.
  Warning:
    Landmark quantile trackers have unbounded memory consumption as they store
    all pushed values in a sorted list. Avoid using in production for
    long-running streams.
  Args:
    q: The quantile to calculate, a float between 0 and 1 (inclusive).
  """
  def __init__(self, q):
    warnings.warn(
        "Quantile trackers should not be used in production due to "
        "the unbounded memory consumption.")
    super().__init__(window_mode=WindowMode.LANDMARK, q=q) 
[docs]
@specifiable
class BufferedSlidingQuantileTracker(BufferedQuantileTracker):
  """Sliding window quantile tracker using a sorted list for quantile
  calculation.
  Warning:
    Maintains a sorted list of values within the sliding window to calculate
    the specified quantile. Memory consumption is bounded by the window size
    but can still be significant for large windows.
  Args:
    window_size: The size of the sliding window.
    q: The quantile to calculate, a float between 0 and 1 (inclusive).
  """
  def __init__(self, window_size, q):
    super().__init__(
        window_mode=WindowMode.SLIDING, q=q, window_size=window_size)