apache_beam.ml.rag.ingestion.bigquery module
- class apache_beam.ml.rag.ingestion.bigquery.SchemaConfig(schema: Dict, embeddable_to_dict_fn: Callable[[EmbeddableItem], Dict[str, any]] | None = None, **kwargs)[source]
Bases:
objectConfiguration for custom BigQuery schema and row conversion.
Allows overriding the default schema and row conversion logic for BigQuery vector storage. This enables custom table schemas beyond the default id/embedding/content/metadata structure.
- Parameters:
schema – BigQuery TableSchema dict defining the table structure.
embeddable_to_dict_fn – Function that converts an EmbeddableItem to a dict matching the schema. Takes an EmbeddableItem and returns Dict[str, Any] with keys matching schema fields.
- Example with custom schema:
>>> schema_config = SchemaConfig( ... schema={ ... 'fields': [ ... {'name': 'id', 'type': 'STRING'}, ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, ... {'name': 'source_url', 'type': 'STRING'} ... ] ... }, ... embeddable_to_dict_fn=lambda item: { ... 'id': item.id, ... 'embedding': item.embedding.dense_embedding, ... 'source_url': item.metadata.get('url') ... } ... )
- class apache_beam.ml.rag.ingestion.bigquery.BigQueryVectorWriterConfig(write_config: Dict[str, Any], *, schema_config: SchemaConfig | None = None)[source]
Bases:
VectorDatabaseWriteConfigConfiguration for writing vectors to BigQuery using managed transforms.
Supports both default schema (id, embedding, content, metadata columns) and custom schemas through SchemaConfig.
- Example with default schema:
>>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'})
- Example with custom schema:
>>> schema_config = SchemaConfig( ... schema={ ... 'fields': [ ... {'name': 'id', 'type': 'STRING'}, ... {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'}, ... {'name': 'source_url', 'type': 'STRING'} ... ] ... }, ... embeddable_to_dict_fn=lambda chunk: { ... 'id': chunk.id, ... 'embedding': chunk.embedding.dense_embedding, ... 'source_url': chunk.metadata.get('url') ... } ... ) >>> config = BigQueryVectorWriterConfig( ... write_config={'table': 'project.dataset.embeddings'}, ... schema_config=schema_config ... )
- Parameters:
write_config – BigQuery write configuration dict. Must include ‘table’. Other options like create_disposition, write_disposition can be specified.
schema_config – Optional configuration for custom schema and row conversion. If not provided, uses default schema with id, embedding, content and metadata columns.
- Raises:
ValueError – If write_config doesn’t include table specification.
- create_write_transform() PTransform[source]
Creates transform to write to BigQuery.