apache_beam.ml.inference.model_manager module

Module for managing ML models in Apache Beam pipelines.

This module provides classes and functions to efficiently manage multiple machine learning models within Apache Beam pipelines. It includes functionality for loading, caching, and updating models using multi-process shared memory, ensuring that models are reused across different workers to optimize resource usage and performance.

class apache_beam.ml.inference.model_manager.GPUMonitor(fallback_memory_mb: float = 16000.0, poll_interval: float = 0.5, peak_window_seconds: float = 30.0)[source]

Bases: object

Monitors GPU memory usage in a separate thread using nvidia-smi.

This class continuously polls GPU memory statistics to track current usage and peak usage over a sliding time window. It serves as the source of truth for the ModelManager’s resource decisions.

fallback_memory_mb

Default total memory if hardware detection fails.

poll_interval

Seconds between memory checks.

peak_window_seconds

Duration to track peak memory usage.

start()[source]
stop()[source]
reset_peak()[source]
get_stats() Tuple[float, float, float][source]
refresh()[source]

Forces an immediate poll of the GPU.

class apache_beam.ml.inference.model_manager.ResourceEstimator(smoothing_factor: float = 0.2, min_data_points: int = 5, verbose_logging: bool = False)[source]

Bases: object

Estimates individual model memory usage using statistical observation.

Uses Non-Negative Least Squares (NNLS) to deduce the memory footprint of individual models based on aggregate system memory readings and the configuration of active models at that time.

logging_info(message: str, *args)[source]
is_unknown(model_tag: str) bool[source]
get_estimate(model_tag: str, default_mb: float = 4000.0) float[source]
set_initial_estimate(model_tag: str, cost: float)[source]
add_observation(active_snapshot: Dict[str, int], peak_memory: float)[source]
class apache_beam.ml.inference.model_manager.QueueTicket(priority, ticket_num, tag)[source]

Bases: object

class apache_beam.ml.inference.model_manager.ModelManager(monitor: GPUMonitor | None = None, slack_percentage: float = 0.1, poll_interval: float = 0.5, peak_window_seconds: float = 30.0, min_data_points: int = 5, smoothing_factor: float = 0.2, eviction_cooldown_seconds: float = 10.0, min_model_copies: int = 1, wait_timeout_seconds: float = 300.0, lock_timeout_seconds: float = 60.0, verbose_logging: bool = False)[source]

Bases: object

Manages model lifecycles, caching, and resource arbitration.

This class acts as the central controller for acquiring model instances.

  1. LRU Caching of idle models.

  2. Resource estimation and admission control (preventing OOM).

  3. Dynamic eviction of low-priority models, determined by count of pending requests, when space is needed.

  4. ‘Isolation Mode’ for safely profiling unknown models.

logging_info(message: str, *args)[source]
all_models(tag) list[Any][source]
try_enter_isolation_mode(tag: str, ticket_num: int) bool[source]
should_spawn_model(tag: str, ticket_num: int) bool[source]
acquire_model(tag: str, loader_func: Callable[[], Any]) Any[source]
release_model(tag: str, instance: Any)[source]
shutdown()[source]