Use Milvus to enrich data
|
In Apache Beam 2.67.0 and later versions, the enrichment transform includes
a built-in enrichment handler for
Milvus.
The following example demonstrates how to create a pipeline that use the enrichment transform with the MilvusSearchEnrichmentHandler
handler.
The data in the Milvus instance collection docs_catalog
follows this format:
id | content | domain | cost | metadata | dense_embedding | sparse_embedding |
---|---|---|---|---|---|---|
1 | This is a test document | medical | 49 | {“language”: “en”} | [0.1, 0.2, 0.3] | [auto-generated by Milvus] |
2 | Another test document | legal | 75 | {“language”: “en”} | [0.2, 0.3, 0.4] | [auto-generated by Milvus] |
3 | وثيقة اختبار | financial | 149 | {“language”: “ar”} | [0.3, 0.4, 0.5] | [auto-generated by Milvus] |
import os
import apache_beam as beam
from apache_beam.ml.rag.types import Content
from apache_beam.ml.rag.types import Chunk
from apache_beam.ml.rag.types import Embedding
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.ml.rag.enrichment.milvus_search import (
MilvusSearchEnrichmentHandler,
MilvusConnectionParameters,
MilvusSearchParameters,
MilvusCollectionLoadParameters,
VectorSearchParameters,
VectorSearchMetrics)
uri = os.environ.get("MILVUS_VECTOR_DB_URI")
user = os.environ.get("MILVUS_VECTOR_DB_USER")
password = os.environ.get("MILVUS_VECTOR_DB_PASSWORD")
db_id = os.environ.get("MILVUS_VECTOR_DB_ID")
token = os.environ.get("MILVUS_VECTOR_DB_TOKEN")
collection_name = os.environ.get("MILVUS_VECTOR_DB_COLLECTION_NAME")
data = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content())
]
connection_parameters = MilvusConnectionParameters(
uri, user, password, db_id, token)
# The first condition (language == "en") excludes documents in other
# languages. Initially, this gives us two documents. After applying the second
# condition (cost < 50), only the first document returns in search results.
filter_expr = 'metadata["language"] == "en" AND cost < 50'
search_params = {"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1}
vector_search_params = VectorSearchParameters(
anns_field="dense_embedding_cosine",
limit=3,
filter=filter_expr,
search_params=search_params)
search_parameters = MilvusSearchParameters(
collection_name=collection_name,
search_strategy=vector_search_params,
output_fields=["id", "content", "domain", "cost", "metadata"],
round_decimal=2)
# The collection load parameters are optional. They provide fine-graine
# control over how collections are loaded into memory. For simple use cases or
# when getting started, this parameter can be omitted to use default loading
# behavior. Consider using it in resource-constrained environments to optimize
# memory usage and query performance.
collection_load_parameters = MilvusCollectionLoadParameters()
milvus_search_handler = MilvusSearchEnrichmentHandler(
connection_parameters=connection_parameters,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Milvus" >> Enrichment(milvus_search_handler)
| "Print" >> beam.Map(print))
Output:
Chunk(content=Content(text=None), id='query1', index=0, metadata={'enrichment_data': defaultdict(<class 'list'>, {'id': [1], 'distance': [1.0], 'fields': [{'content': 'This is a test document', 'cost': 49, 'domain': 'medical', 'id': 1, 'metadata': {'language': 'en'}}]})}, embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3], sparse_embedding=None))
Notebook exmaple
API documentation
![]() |
Last updated on 2025/10/20
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!