apache_beam.ml.rag.utils module

class apache_beam.ml.rag.utils.MilvusConnectionParameters(uri: str, user: str = <factory>, password: str = <factory>, db_name: str = 'default', token: str = <factory>, timeout: float | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Configurations for establishing connections to Milvus servers.

Parameters:
  • uri – URI endpoint for connecting to Milvus server in the format “http(s)://hostname:port”.

  • user – Username for authentication. Required if authentication is enabled and not using token authentication.

  • password – Password for authentication. Required if authentication is enabled and not using token authentication.

  • db_name – Database Name to connect to. Specifies which Milvus database to use. Defaults to ‘default’.

  • token – Authentication token as an alternative to username/password.

  • timeout – Connection timeout in seconds. Uses client default if None.

  • kwargs – Optional keyword arguments for additional connection parameters. Enables forward compatibility.

uri: str
user: str
password: str
db_name: str = 'default'
token: str
timeout: float | None = None
kwargs: Dict[str, Any]
class apache_beam.ml.rag.utils.MilvusHelpers[source]

Bases: object

Utility class providing helper methods for Milvus vector db operations.

static sparse_embedding(sparse_vector: Tuple[List[int], List[float]]) Dict[int, float] | None[source]
apache_beam.ml.rag.utils.parse_chunk_strings(chunk_str_list: List[str]) List[Chunk][source]
apache_beam.ml.rag.utils.unpack_dataclass_with_kwargs(dataclass_instance)[source]

Unpacks dataclass fields into a flat dict, merging kwargs with precedence.

Parameters:

dataclass_instance – Dataclass instance to unpack.

Returns:

Flattened dictionary with kwargs taking precedence over fields.

Return type:

dict

apache_beam.ml.rag.utils.retry_with_backoff(operation: ~typing.Callable[[], ~typing.Any], max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, operation_name: str = 'operation', exception_types: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,)) Any[source]

Executes an operation with retry logic and exponential backoff.

This is a generic retry utility that can be used for any operation that may fail transiently. It retries the operation with exponential backoff between attempts.

Note

This utility is designed for one-time setup operations and complements Apache Beam’s RequestResponseIO pattern. Use retry_with_backoff() for:

  • Establishing client connections in __enter__() methods (e.g., creating MilvusClient instances, database connections) before processing elements

  • One-time setup/teardown operations in DoFn lifecycle methods

  • Operations outside of per-element processing where retry is needed

For per-element operations (e.g., API calls within Caller.__call__), use RequestResponseIO which already provides automatic retry with exponential backoff, failure handling, caching, and other features. See: https://beam.apache.org/documentation/io/built-in/webapis/

Parameters:
  • operation – Callable that performs the operation to retry. Should return the result of the operation.

  • max_retries – Maximum number of retry attempts. Default is 3.

  • retry_delay – Initial delay in seconds between retries. Default is 1.0.

  • retry_backoff_factor – Multiplier for the delay after each retry. Default is 2.0 (exponential backoff).

  • operation_name – Name of the operation for logging purposes. Default is “operation”.

  • exception_types – Tuple of exception types to catch and retry. Default is (Exception,) which catches all exceptions.

Returns:

The result of the operation if successful.

Raises:

The last exception encountered if all retry attempts fail.

Example

>>> def connect_to_service():
...     return service.connect(host="localhost")
>>> client = retry_with_backoff(
...     connect_to_service,
...     max_retries=5,
...     retry_delay=2.0,
...     operation_name="service connection")