apache_beam.ml.rag.enrichment.bigquery_vector_search module
- class apache_beam.ml.rag.enrichment.bigquery_vector_search.BigQueryVectorSearchParameters(project: str, table_name: str, embedding_column: str, columns: List[str], neighbor_count: int, metadata_restriction_template: str | None = None, distance_type: str | None = None, options: Dict[str, Any] | None = None)[source]
Bases:
object
Parameters for configuring BigQuery vector similarity search.
This class is used by BigQueryVectorSearchEnrichmentHandler to perform vector similarity search using BigQuery’s VECTOR_SEARCH function. It processes
Chunk
objects that containEmbedding
and returns similar vectors from a BigQuery table.BigQueryVectorSearchEnrichmentHandler is used with
Enrichment
transform to enrich Chunks with similar content from a vector database. For example:>>> # Create search parameters >>> params = BigQueryVectorSearchParameters( ... table_name='project.dataset.embeddings', ... embedding_column='embedding', ... columns=['content'], ... neighbor_count=5 ... ) >>> # Use in pipeline >>> enriched = ( ... chunks ... | "Generate Embeddings" >> MLTransform(...) ... | "Find Similar" >> Enrichment( ... BigQueryVectorSearchEnrichmentHandler( ... project='my-project', ... vector_search_parameters=params ... ) ... ) ... )
BigQueryVectorSearchParameters encapsulates the configuration needed to perform vector similarity search using BigQuery’s VECTOR_SEARCH function. It handles formatting the query with proper embedding vectors and metadata restrictions.
Example with flattened metadata column:
Table schema:
embedding: ARRAY<FLOAT64> # Vector embedding content: STRING # Document content language: STRING # Direct metadata column
Code:
>>> params = BigQueryVectorSearchParameters( ... table_name='project.dataset.embeddings', ... embedding_column='embedding', ... columns=['content', 'language'], ... neighbor_count=5, ... # For column 'language', value comes from ... # chunk.metadata['language'] ... metadata_restriction_template="language = '{language}'" ... ) >>> # When processing a chunk with metadata={'language': 'en'}, >>> # generates: WHERE language = 'en'
Example with nested repeated metadata:
Table schema:
embedding: ARRAY<FLOAT64> # Vector embedding content: STRING # Document content metadata: ARRAY<STRUCT> # Nested repeated metadata key: STRING, value: STRING >>
Code:
>>> params = BigQueryVectorSearchParameters( ... table_name='project.dataset.embeddings', ... embedding_column='embedding', ... columns=['content', 'metadata'], ... neighbor_count=5, ... # check_metadata(field_name, key_to_search, value_from_chunk) ... metadata_restriction_template=( ... "check_metadata(metadata, 'language', '{language}')" ... ) ... ) >>> # When processing a chunk with metadata={'language': 'en'}, >>> # generates: WHERE check_metadata(metadata, 'language', 'en') >>> # Searches for {key: 'language', value: 'en'} in metadata array
- Parameters:
project – GCP project ID containing the BigQuery dataset
table_name – Fully qualified BigQuery table name containing vectors.
embedding_column – Column name containing the embedding vectors.
columns – List of columns to retrieve from matched vectors.
neighbor_count – Number of similar vectors to return (top-k).
metadata_restriction_template –
Template string for filtering vectors. Two formats supported:
For flattened metadata columns:
column_name = '{metadata_key}'
where column_name is the BigQuery column and metadata_key is used to get the value from chunk.metadata[metadata_key].For nested repeated metadata (ARRAY<STRUCT<key,value>>):
check_metadata(field_name, 'key_to_match', '{metadata_key}')
where field_name is the ARRAY<STRUCT> column in BigQuery, key_to_match is the literal key to search for in the array, and metadata_key is used to get value from chunk.metadata[metadata_key].
Multiple conditions can be combined using AND/OR operators. For example:
>>> # Combine metadata check with column filter >>> template = ( ... "check_metadata(metadata, 'language', '{language}') " ... "AND source = '{source}'" ... ) >>> # When chunk.metadata = {'language': 'en', 'source': 'web'} >>> # Generates: WHERE >>> # check_metadata(metadata, 'language', 'en') >>> # AND source = 'web'
distance_type – Optional distance metric to use. Supported values: COSINE (default), EUCLIDEAN, or DOT_PRODUCT.
options – Optional dictionary of additional VECTOR_SEARCH options.
- class apache_beam.ml.rag.enrichment.bigquery_vector_search.BigQueryVectorSearchEnrichmentHandler(vector_search_parameters: BigQueryVectorSearchParameters, *, min_batch_size: int = 1, max_batch_size: int = 1000, **kwargs)[source]
Bases:
EnrichmentSourceHandler
[Chunk
|List
[Chunk
],List
[Tuple
[Chunk
,Dict
[str
,Any
]]]]Enrichment handler that performs vector similarity search using BigQuery.
This handler enriches Chunks by finding similar vectors in a BigQuery table using the VECTOR_SEARCH function. It supports batching requests for efficiency and preserves the original Chunk metadata while adding the search results.
Example
>>> from apache_beam.ml.rag.types import Chunk, Content, Embedding >>> >>> # Configure vector search >>> params = BigQueryVectorSearchParameters( ... table_name='project.dataset.embeddings', ... embedding_column='embedding', ... columns=['content', 'metadata'], ... neighbor_count=2, ... metadata_restriction_template="language = '{language}'" ... ) >>> >>> # Create handler >>> handler = BigQueryVectorSearchEnrichmentHandler( ... project='my-project', ... vector_search_parameters=params, ... min_batch_size=100, ... max_batch_size=1000 ... ) >>> >>> # Use in pipeline >>> with beam.Pipeline() as p: ... enriched = ( ... p ... | beam.Create([ ... Chunk( ... id='query1', ... embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]), ... content=Content(text='test query'), ... metadata={'language': 'en'} ... ) ... ]) ... | Enrichment(handler) ... )
- Parameters:
vector_search_parameters – Configuration for the vector search query
min_batch_size – Minimum number of chunks to batch before processing
max_batch_size – Maximum number of chunks to process in one batch
**kwargs – Additional arguments passed to bigquery.Client
The handler will: 1. Batch incoming chunks according to batch size parameters 2. Format and execute vector search query for each batch 3. Join results back to original chunks 4. Return tuples of (original_chunk, search_results)