apache_beam.ml.rag.ingestion.bigquery module
- class apache_beam.ml.rag.ingestion.bigquery.SchemaConfig(schema: Dict, chunk_to_dict_fn: Callable[[Chunk], Dict[str, any]])[source]
Bases:
object
Configuration for custom BigQuery schema and row conversion.
Allows overriding the default schema and row conversion logic for BigQuery vector storage. This enables custom table schemas beyond the default id/embedding/content/metadata structure.
- schema
BigQuery TableSchema dict defining the table structure. Example: >>> { … ‘fields’: [ … {‘name’: ‘id’, ‘type’: ‘STRING’}, … {‘name’: ‘embedding’, ‘type’: ‘FLOAT64’, ‘mode’: ‘REPEATED’}, … {‘name’: ‘custom_field’, ‘type’: ‘STRING’} … ] … }
- Type:
Dict
- chunk_to_dict_fn
Function that converts a Chunk to a dict matching the schema. Takes a Chunk and returns Dict[str, Any] with keys matching schema fields. Example: >>> def chunk_to_dict(chunk: Chunk) -> Dict[str, Any]: … return { … ‘id’: chunk.id, … ‘embedding’: chunk.embedding.dense_embedding, … ‘custom_field’: chunk.metadata.get(‘custom_field’) … }
- Type:
collections.abc.Callable[[apache_beam.ml.rag.types.Chunk], Dict[str, any]]
- class apache_beam.ml.rag.ingestion.bigquery.BigQueryVectorWriterConfig(write_config: Dict[str, Any], *, schema_config: SchemaConfig | None = None)[source]
Bases:
VectorDatabaseWriteConfig
Configuration for writing vectors to BigQuery using managed transforms.
Supports both default schema (id, embedding, content, metadata columns) and custom schemas through SchemaConfig.
- Example with default schema:
>>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'})
- Example with custom schema:
>>> schema_config = SchemaConfig( ... schema={ ... 'fields': [ ... {'name': 'id', 'type': 'STRING'}, ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, ... {'name': 'source_url', 'type': 'STRING'} ... ] ... }, ... chunk_to_dict_fn=lambda chunk: { ... 'id': chunk.id, ... 'embedding': chunk.embedding.dense_embedding, ... 'source_url': chunk.metadata.get('url') ... } ... ) >>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'}, ... schema_config=schema_config ... )
- Parameters:
write_config – BigQuery write configuration dict. Must include ‘table’. Other options like create_disposition, write_disposition can be specified.
schema_config – Optional configuration for custom schema and row conversion. If not provided, uses default schema with id, embedding, content and metadata columns.
- Raises:
ValueError – If write_config doesn’t include table specification.
- create_write_transform() PTransform [source]
Creates transform to write to BigQuery.