apache_beam.ml.rag.ingestion.bigquery module
- class apache_beam.ml.rag.ingestion.bigquery.SchemaConfig(schema: Dict, chunk_to_dict_fn: Callable[[Chunk], Dict[str, any]])[source]
Bases:
objectConfiguration for custom BigQuery schema and row conversion.
Allows overriding the default schema and row conversion logic for BigQuery vector storage. This enables custom table schemas beyond the default id/embedding/content/metadata structure.
- schema
BigQuery TableSchema dict defining the table structure. Example: >>> { … ‘fields’: [ … {‘name’: ‘id’, ‘type’: ‘STRING’}, … {‘name’: ‘embedding’, ‘type’: ‘FLOAT64’, ‘mode’: ‘REPEATED’}, … {‘name’: ‘custom_field’, ‘type’: ‘STRING’} … ] … }
- Type:
Dict
- chunk_to_dict_fn
Function that converts a Chunk to a dict matching the schema. Takes a Chunk and returns Dict[str, Any] with keys matching schema fields. Example: >>> def chunk_to_dict(chunk: Chunk) -> Dict[str, Any]: … return { … ‘id’: chunk.id, … ‘embedding’: chunk.embedding.dense_embedding, … ‘custom_field’: chunk.metadata.get(‘custom_field’) … }
- Type:
collections.abc.Callable[[apache_beam.ml.rag.types.Chunk], Dict[str, any]]
- class apache_beam.ml.rag.ingestion.bigquery.BigQueryVectorWriterConfig(write_config: Dict[str, Any], *, schema_config: SchemaConfig | None = None)[source]
Bases:
VectorDatabaseWriteConfigConfiguration for writing vectors to BigQuery using managed transforms.
Supports both default schema (id, embedding, content, metadata columns) and custom schemas through SchemaConfig.
- Example with default schema:
>>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'})
- Example with custom schema:
>>> schema_config = SchemaConfig( ... schema={ ... 'fields': [ ... {'name': 'id', 'type': 'STRING'}, ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, ... {'name': 'source_url', 'type': 'STRING'} ... ] ... }, ... chunk_to_dict_fn=lambda chunk: { ... 'id': chunk.id, ... 'embedding': chunk.embedding.dense_embedding, ... 'source_url': chunk.metadata.get('url') ... } ... ) >>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'}, ... schema_config=schema_config ... )
- Parameters:
write_config – BigQuery write configuration dict. Must include ‘table’. Other options like create_disposition, write_disposition can be specified.
schema_config – Optional configuration for custom schema and row conversion. If not provided, uses default schema with id, embedding, content and metadata columns.
- Raises:
ValueError – If write_config doesn’t include table specification.
- create_write_transform() PTransform[source]
Creates transform to write to BigQuery.