Use Milvus to enrich data

Pydoc Pydoc




In Apache Beam 2.67.0 and later versions, the enrichment transform includes a built-in enrichment handler for Milvus. The following example demonstrates how to create a pipeline that use the enrichment transform with the MilvusSearchEnrichmentHandler handler.

The data in the Milvus instance collection docs_catalog follows this format:

idcontentdomaincostmetadatadense_embeddingsparse_embedding
1This is a test documentmedical49{“language”: “en”}[0.1, 0.2, 0.3][auto-generated by Milvus]
2Another test documentlegal75{“language”: “en”}[0.2, 0.3, 0.4][auto-generated by Milvus]
3وثيقة اختبارfinancial149{“language”: “ar”}[0.3, 0.4, 0.5][auto-generated by Milvus]
import os
import apache_beam as beam
from apache_beam.ml.rag.types import Content
from apache_beam.ml.rag.types import Chunk
from apache_beam.ml.rag.types import Embedding
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.ml.rag.enrichment.milvus_search import (
    MilvusSearchEnrichmentHandler,
    MilvusConnectionParameters,
    MilvusSearchParameters,
    MilvusCollectionLoadParameters,
    VectorSearchParameters,
    VectorSearchMetrics)

uri = os.environ.get("MILVUS_VECTOR_DB_URI")
user = os.environ.get("MILVUS_VECTOR_DB_USER")
password = os.environ.get("MILVUS_VECTOR_DB_PASSWORD")
db_id = os.environ.get("MILVUS_VECTOR_DB_ID")
token = os.environ.get("MILVUS_VECTOR_DB_TOKEN")
collection_name = os.environ.get("MILVUS_VECTOR_DB_COLLECTION_NAME")

data = [
    Chunk(
        id="query1",
        embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
        content=Content())
]

connection_parameters = MilvusConnectionParameters(
    uri, user, password, db_id, token)

# The first condition (language == "en") excludes documents in other
# languages. Initially, this gives us two documents. After applying the second
# condition (cost < 50), only the first document returns in search results.
filter_expr = 'metadata["language"] == "en" AND cost < 50'

search_params = {"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1}

vector_search_params = VectorSearchParameters(
    anns_field="dense_embedding_cosine",
    limit=3,
    filter=filter_expr,
    search_params=search_params)

search_parameters = MilvusSearchParameters(
    collection_name=collection_name,
    search_strategy=vector_search_params,
    output_fields=["id", "content", "domain", "cost", "metadata"],
    round_decimal=2)

# The collection load parameters are optional. They provide fine-graine
# control over how collections are loaded into memory. For simple use cases or
# when getting started, this parameter can be omitted to use default loading
# behavior. Consider using it in resource-constrained environments to optimize
# memory usage and query performance.
collection_load_parameters = MilvusCollectionLoadParameters()

milvus_search_handler = MilvusSearchEnrichmentHandler(
    connection_parameters=connection_parameters,
    search_parameters=search_parameters,
    collection_load_parameters=collection_load_parameters)
with beam.Pipeline() as p:
  _ = (
      p
      | "Create" >> beam.Create(data)
      | "Enrich W/ Milvus" >> Enrichment(milvus_search_handler)
      | "Print" >> beam.Map(print))

Output:

Chunk(content=Content(text=None), id='query1', index=0, metadata={'enrichment_data': defaultdict(<class 'list'>, {'id': [1], 'distance': [1.0], 'fields': [{'content': 'This is a test document', 'cost': 49, 'domain': 'medical', 'id': 1, 'metadata': {'language': 'en'}}]})}, embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3], sparse_embedding=None))

Notebook exmaple

Open In Colab

API documentation

Pydoc Pydoc