apache_beam.ml.rag.utils module
- class apache_beam.ml.rag.utils.MilvusConnectionParameters(uri: str, user: str = <factory>, password: str = <factory>, db_name: str = 'default', token: str = <factory>, timeout: float | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectConfigurations for establishing connections to Milvus servers.
- Parameters:
uri – URI endpoint for connecting to Milvus server in the format “http(s)://hostname:port”.
user – Username for authentication. Required if authentication is enabled and not using token authentication.
password – Password for authentication. Required if authentication is enabled and not using token authentication.
db_name – Database Name to connect to. Specifies which Milvus database to use. Defaults to ‘default’.
token – Authentication token as an alternative to username/password.
timeout – Connection timeout in seconds. Uses client default if None.
kwargs – Optional keyword arguments for additional connection parameters. Enables forward compatibility.
- class apache_beam.ml.rag.utils.MilvusHelpers[source]
Bases:
objectUtility class providing helper methods for Milvus vector db operations.
- apache_beam.ml.rag.utils.unpack_dataclass_with_kwargs(dataclass_instance)[source]
Unpacks dataclass fields into a flat dict, merging kwargs with precedence.
- Parameters:
dataclass_instance – Dataclass instance to unpack.
- Returns:
Flattened dictionary with kwargs taking precedence over fields.
- Return type:
- apache_beam.ml.rag.utils.retry_with_backoff(operation: ~typing.Callable[[], ~typing.Any], max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, operation_name: str = 'operation', exception_types: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,)) Any[source]
Executes an operation with retry logic and exponential backoff.
This is a generic retry utility that can be used for any operation that may fail transiently. It retries the operation with exponential backoff between attempts.
Note
This utility is designed for one-time setup operations and complements Apache Beam’s RequestResponseIO pattern. Use retry_with_backoff() for:
Establishing client connections in __enter__() methods (e.g., creating MilvusClient instances, database connections) before processing elements
One-time setup/teardown operations in DoFn lifecycle methods
Operations outside of per-element processing where retry is needed
For per-element operations (e.g., API calls within Caller.__call__), use RequestResponseIO which already provides automatic retry with exponential backoff, failure handling, caching, and other features. See: https://beam.apache.org/documentation/io/built-in/webapis/
- Parameters:
operation – Callable that performs the operation to retry. Should return the result of the operation.
max_retries – Maximum number of retry attempts. Default is 3.
retry_delay – Initial delay in seconds between retries. Default is 1.0.
retry_backoff_factor – Multiplier for the delay after each retry. Default is 2.0 (exponential backoff).
operation_name – Name of the operation for logging purposes. Default is “operation”.
exception_types – Tuple of exception types to catch and retry. Default is (Exception,) which catches all exceptions.
- Returns:
The result of the operation if successful.
- Raises:
The last exception encountered if all retry attempts fail. –
Example
>>> def connect_to_service(): ... return service.connect(host="localhost") >>> client = retry_with_backoff( ... connect_to_service, ... max_retries=5, ... retry_delay=2.0, ... operation_name="service connection")