Apache Beam
  • apache_beam.coders package
  • apache_beam.dataframe package
  • apache_beam.io package
  • apache_beam.metrics package
  • apache_beam.ml package
    • Subpackages
      • apache_beam.ml.anomaly package
      • apache_beam.ml.gcp package
      • apache_beam.ml.inference package
      • apache_beam.ml.rag package
        • Subpackages
        • Submodules
      • apache_beam.ml.transforms package
      • apache_beam.ml.ts package
  • apache_beam.options package
  • apache_beam.portability package
  • apache_beam.runners package
  • apache_beam.testing package
  • apache_beam.transforms package
  • apache_beam.typehints package
  • apache_beam.utils package
  • apache_beam.yaml package
  • apache_beam.error module
  • apache_beam.pipeline module
  • apache_beam.pvalue module
Apache Beam
  • apache_beam.ml package
  • apache_beam.ml.rag package
  • apache_beam.ml.rag.enrichment package
  • apache_beam.ml.rag.enrichment.milvus_search module
  • View page source

apache_beam.ml.rag.enrichment.milvus_search module

class apache_beam.ml.rag.enrichment.milvus_search.SearchStrategy(value)[source]

Bases: Enum

Search strategies for information retrieval.

Parameters:
  • HYBRID – Combines vector and keyword search approaches. Leverages both semantic understanding and exact matching. Typically provides the most comprehensive results. Useful for queries with both conceptual and specific keyword components.

  • VECTOR – Vector similarity search only. Based on semantic similarity between query and documents. Effective for conceptual searches and finding related content. Less sensitive to exact terminology than keyword search.

  • KEYWORD – Keyword/text search only. Based on exact or fuzzy matching of specific terms. Effective for precise queries where exact wording matters. Less effective for conceptual or semantic searches.

HYBRID = 'hybrid'
VECTOR = 'vector'
KEYWORD = 'keyword'
class apache_beam.ml.rag.enrichment.milvus_search.KeywordSearchMetrics(value)[source]

Bases: Enum

Metrics for keyword search.

Parameters:

BM25 – Range [0 to ∞), Best Match 25 ranking algorithm for text relevance. Combines term frequency, inverse document frequency, and document length. Higher scores indicate greater relevance. Higher scores indicate greater relevance. Takes into account diminishing returns of term frequency. Balances between exact matching and semantic relevance.

BM25 = 'BM25'
class apache_beam.ml.rag.enrichment.milvus_search.VectorSearchMetrics(value)[source]

Bases: Enum

Metrics for vector search.

Parameters:
  • COSINE – Range [-1 to 1], higher values indicate greater similarity. Value 1 means vectors point in identical direction. Value 0 means vectors are perpendicular to each other (no relationship). Value -1 means vectors point in exactly opposite directions.

  • EUCLIDEAN_DISTANCE (L2) – Range [0 to ∞), lower values indicate greater similarity. Value 0 means vectors are identical. Larger values mean more dissimilarity between vectors.

  • INNER_PRODUCT (IP) – Range varies based on vector magnitudes, higher values indicate greater similarity. Value 0 means vectors are perpendicular to each other. Positive values mean vectors share some directional component. Negative values mean vectors point in opposing directions.

COSINE = 'COSINE'
EUCLIDEAN_DISTANCE = 'L2'
INNER_PRODUCT = 'IP'
class apache_beam.ml.rag.enrichment.milvus_search.MilvusBaseRanker[source]

Bases: object

Base class for ranking algorithms in Milvus hybrid search strategy.

dict()[source]
class apache_beam.ml.rag.enrichment.milvus_search.MilvusConnectionParameters(uri: str, user: str = <factory>, password: str = <factory>, db_id: str = 'default', token: str = <factory>, timeout: float | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Parameters for establishing connections to Milvus servers.

Parameters:
  • uri – URI endpoint for connecting to Milvus server in the format “http(s)://hostname:port”.

  • user – Username for authentication. Required if authentication is enabled and not using token authentication.

  • password – Password for authentication. Required if authentication is enabled and not using token authentication.

  • db_id – Database ID to connect to. Specifies which Milvus database to use. Defaults to ‘default’.

  • token – Authentication token as an alternative to username/password.

  • timeout – Connection timeout in seconds. Uses client default if None.

  • kwargs – Optional keyword arguments for additional connection parameters. Enables forward compatibility.

uri: str
user: str
password: str
db_id: str = 'default'
token: str
timeout: float | None = None
kwargs: Dict[str, Any]
class apache_beam.ml.rag.enrichment.milvus_search.BaseSearchParameters(anns_field: str, limit: int = 3, filter: str = <factory>, search_params: ~typing.Dict[str, ~typing.Any] = <factory>, consistency_level: str | None = None)[source]

Bases: object

Base parameters for both vector and keyword search operations.

Parameters:
  • anns_field – Approximate nearest neighbor search field indicates field name containing the embedding to search. Required for both vector and keyword search.

  • limit – Maximum number of results to return per query. Must be positive. Defaults to 3 search results.

  • filter – Boolean expression string for filtering search results. Example: ‘price <= 1000 AND category == “electronics”’.

  • search_params – Additional search parameters specific to the search type. Example: {“metric_type”: VectorSearchMetrics.EUCLIDEAN_DISTANCE}.

  • consistency_level – Consistency level for read operations. Options: “Strong”, “Session”, “Bounded”, “Eventually”. Defaults to “Bounded” if not specified when creating the collection.

anns_field: str
limit: int = 3
filter: str
search_params: Dict[str, Any]
consistency_level: str | None = None
class apache_beam.ml.rag.enrichment.milvus_search.VectorSearchParameters(anns_field: str, limit: int = 3, filter: str = <factory>, search_params: ~typing.Dict[str, ~typing.Any] = <factory>, consistency_level: str | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: BaseSearchParameters

Parameters for vector similarity search operations.

Inherits all parameters from BaseSearchParameters with the same semantics. The anns_field should contain dense vector embeddings for this search type.

Parameters:

kwargs – Optional keyword arguments for additional vector search parameters. Enables forward compatibility.

Note

For inherited parameters documentation, see BaseSearchParameters.

kwargs: Dict[str, Any]
class apache_beam.ml.rag.enrichment.milvus_search.KeywordSearchParameters(anns_field: str, limit: int = 3, filter: str = <factory>, search_params: ~typing.Dict[str, ~typing.Any] = <factory>, consistency_level: str | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: BaseSearchParameters

Parameters for keyword/text search operations.

This class inherits all parameters from BaseSearchParameters with the same semantics. The anns_field should contain sparse vector embeddings content for this search type.

Parameters:

kwargs – Optional keyword arguments for additional keyword search parameters. Enables forward compatibility.

Note

For inherited parameters documentation, see BaseSearchParameters.

kwargs: Dict[str, Any]
class apache_beam.ml.rag.enrichment.milvus_search.HybridSearchParameters(vector: ~apache_beam.ml.rag.enrichment.milvus_search.VectorSearchParameters, keyword: ~apache_beam.ml.rag.enrichment.milvus_search.KeywordSearchParameters, ranker: ~apache_beam.ml.rag.enrichment.milvus_search.MilvusBaseRanker, limit: int = 3, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Parameters for hybrid (vector + keyword) search operations.

Parameters:
  • vector – Parameters for the vector search component.

  • keyword – Parameters for the keyword search component.

  • ranker – Ranker for combining vector and keyword search results. Example: RRFRanker(k=100).

  • limit – Maximum number of results to return per query. Defaults to 3 search results.

  • kwargs – Optional keyword arguments for additional hybrid search parameters. Enables forward compatibility.

vector: VectorSearchParameters
keyword: KeywordSearchParameters
ranker: MilvusBaseRanker
limit: int = 3
kwargs: Dict[str, Any]
class apache_beam.ml.rag.enrichment.milvus_search.MilvusSearchParameters(collection_name: str, search_strategy: ~apache_beam.ml.rag.enrichment.milvus_search.VectorSearchParameters | ~apache_beam.ml.rag.enrichment.milvus_search.KeywordSearchParameters | ~apache_beam.ml.rag.enrichment.milvus_search.HybridSearchParameters, partition_names: ~typing.List[str] = <factory>, output_fields: ~typing.List[str] = <factory>, timeout: float | None = None, round_decimal: int = -1)[source]

Bases: object

Parameters configuring Milvus search operations.

This class encapsulates all parameters needed to execute searches against Milvus collections, supporting vector, keyword, and hybrid search strategies.

Parameters:
  • collection_name – Name of the collection to search in.

  • search_strategy – Type of search to perform (VECTOR, KEYWORD, or HYBRID).

  • partition_names – List of partition names to restrict the search to. If empty, all partitions will be searched.

  • output_fields – List of field names to include in search results. If empty, only primary fields including distances will be returned.

  • timeout – Search operation timeout in seconds. If not specified, the client’s default timeout is used.

  • round_decimal – Number of decimal places for distance/similarity scores. Defaults to -1 means no rounding.

collection_name: str
search_strategy: VectorSearchParameters | KeywordSearchParameters | HybridSearchParameters
partition_names: List[str]
output_fields: List[str]
timeout: float | None = None
round_decimal: int = -1
class apache_beam.ml.rag.enrichment.milvus_search.MilvusCollectionLoadParameters(refresh: bool = <factory>, resource_groups: ~typing.List[str] = <factory>, load_fields: ~typing.List[str] = <factory>, skip_load_dynamic_field: bool = <factory>, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Parameters that control how Milvus loads a collection into memory.

This class provides fine-grained control over collection loading, which is particularly important in resource-constrained environments. Proper configuration can significantly reduce memory usage and improve query performance by loading only necessary data.

Parameters:
  • refresh – If True, forces a reload of the collection even if already loaded. Ensures the most up-to-date data is in memory.

  • resource_groups – List of resource groups to load the collection into. Can be used for load balancing across multiple query nodes.

  • load_fields – Specify which fields to load into memory. Loading only necessary fields reduces memory usage. If empty, all fields loaded.

  • skip_load_dynamic_field – If True, dynamic/growing fields will not be loaded into memory. Saves memory when dynamic fields aren’t needed.

  • kwargs – Optional keyword arguments for additional collection load parameters. Enables forward compatibility.

refresh: bool
resource_groups: List[str]
load_fields: List[str]
skip_load_dynamic_field: bool
kwargs: Dict[str, Any]
class apache_beam.ml.rag.enrichment.milvus_search.MilvusSearchResult(id: ~typing.List[str | int] = <factory>, distance: ~typing.List[float] = <factory>, fields: ~typing.List[~typing.Dict[str, ~typing.Any]] = <factory>)[source]

Bases: object

Search result from Milvus per chunk.

Parameters:
  • id – List of entity IDs returned from the search. Can be either string or integer IDs.

  • distance – List of distances/similarity scores for each returned entity.

  • fields – List of dictionaries containing additional field values for each entity. Each dictionary corresponds to one returned entity.

id: List[str | int]
distance: List[float]
fields: List[Dict[str, Any]]
class apache_beam.ml.rag.enrichment.milvus_search.MilvusSearchEnrichmentHandler(connection_parameters: MilvusConnectionParameters, search_parameters: MilvusSearchParameters, *, collection_load_parameters: MilvusCollectionLoadParameters | None, min_batch_size: int = 1, max_batch_size: int = 1000, **kwargs)[source]

Bases: EnrichmentSourceHandler[Chunk | List[Chunk], List[Tuple[Chunk, Dict[str, Any]]]]

Enrichment handler for Milvus vector database searches.

This handler is designed to work with the apache_beam.transforms.enrichment.EnrichmentSourceHandler transform. It enables enriching data through vector similarity, keyword, or hybrid searches against Milvus collections.

The handler supports different search strategies: * Vector search - For finding similar embeddings based on vector similarity * Keyword search - For text-based retrieval using BM25 or other text metrics * Hybrid search - For combining vector and keyword search results

This handler queries the Milvus database per element by default. To enable batching for improved performance, set the min_batch_size and max_batch_size parameters. These control the batching behavior in the apache_beam.transforms.utils.BatchElements transform.

For memory-intensive operations, the handler allows fine-grained control over collection loading through the collection_load_parameters.

Example Usage:
connection_paramters = MilvusConnectionParameters(

uri=”http://localhost:19530”)

search_parameters = MilvusSearchParameters(

collection_name=”my_collection”, search_strategy=VectorSearchParameters(anns_field=”embedding”))

collection_load_parameters = MilvusCollectionLoadParameters(

load_fields=[“embedding”, “metadata”]),

milvus_handler = MilvusSearchEnrichmentHandler(

connection_paramters, search_parameters, collection_load_parameters=collection_load_parameters, min_batch_size=10, max_batch_size=100)

Parameters:
  • connection_parameters (MilvusConnectionParameters) – Configuration for connecting to the Milvus server, including URI, credentials, and connection options.

  • search_parameters (MilvusSearchParameters) – Configuration for search operations, including collection name, search strategy, and output fields.

  • collection_load_parameters (Optional[MilvusCollectionLoadParameters]) – Parameters controlling how collections are loaded into memory, which can significantly impact resource usage and performance.

  • min_batch_size (int) – Minimum number of elements to batch together when querying Milvus. Default is 1 (no batching when max_batch_size is 1).

  • max_batch_size (int) – Maximum number of elements to batch together.Default is 1000. Higher values may improve throughput but increase memory usage.

  • **kwargs – Additional keyword arguments for Milvus Enrichment Handler.

Note

  • For large collections, consider setting appropriate values in collection_load_parameters to reduce memory usage.

  • The search_strategy in search_parameters determines the type of search (vector, keyword, or hybrid) and associated parameters.

  • Batching can significantly improve performance but requires more memory.

convert_sparse_embedding_to_milvus_format(sparse_vector: Tuple[List[int], List[float]]) → Dict[int, float][source]
property collection_name

Getter method for collection_name property

property search_strategy

Getter method for search_strategy property

property partition_names

Getter method for partition_names property

property output_fields

Getter method for output_fields property

property timeout

Getter method for search timeout property

property round_decimal

Getter method for search round_decimal property

batch_elements_kwargs() → Dict[str, int][source]

Returns kwargs for beam.BatchElements.

apache_beam.ml.rag.enrichment.milvus_search.join_fn(left: Embedding, right: Dict[str, Any]) → Embedding[source]
apache_beam.ml.rag.enrichment.milvus_search.unpack_dataclass_with_kwargs(dataclass_instance)[source]
Previous Next

© Copyright %Y, Apache Beam.

Built with Sphinx using a theme provided by Read the Docs.