apache_beam.ml.rag.ingestion.bigquery module

class apache_beam.ml.rag.ingestion.bigquery.SchemaConfig(schema: Dict, chunk_to_dict_fn: Callable[[Chunk], Dict[str, any]])[source]

Bases: object

Configuration for custom BigQuery schema and row conversion.

Allows overriding the default schema and row conversion logic for BigQuery vector storage. This enables custom table schemas beyond the default id/embedding/content/metadata structure.

schema

BigQuery TableSchema dict defining the table structure. Example: >>> { … ‘fields’: [ … {‘name’: ‘id’, ‘type’: ‘STRING’}, … {‘name’: ‘embedding’, ‘type’: ‘FLOAT64’, ‘mode’: ‘REPEATED’}, … {‘name’: ‘custom_field’, ‘type’: ‘STRING’} … ] … }

Type:

Dict

chunk_to_dict_fn

Function that converts a Chunk to a dict matching the schema. Takes a Chunk and returns Dict[str, Any] with keys matching schema fields. Example: >>> def chunk_to_dict(chunk: Chunk) -> Dict[str, Any]: … return { … ‘id’: chunk.id, … ‘embedding’: chunk.embedding.dense_embedding, … ‘custom_field’: chunk.metadata.get(‘custom_field’) … }

Type:

collections.abc.Callable[[apache_beam.ml.rag.types.Chunk], Dict[str, any]]

schema: Dict
chunk_to_dict_fn: Callable[[Chunk], Dict[str, any]]
class apache_beam.ml.rag.ingestion.bigquery.BigQueryVectorWriterConfig(write_config: Dict[str, Any], *, schema_config: SchemaConfig | None = None)[source]

Bases: VectorDatabaseWriteConfig

Configuration for writing vectors to BigQuery using managed transforms.

Supports both default schema (id, embedding, content, metadata columns) and custom schemas through SchemaConfig.

Example with default schema:
>>> config = BigQueryVectorWriterConfig(
...     write_config={'table': 'project.dataset.embeddings'})
Example with custom schema:
>>> schema_config = SchemaConfig(
...   schema={
...     'fields': [
...       {'name': 'id', 'type': 'STRING'},
...       {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
...       {'name': 'source_url', 'type': 'STRING'}
...     ]
...   },
...   chunk_to_dict_fn=lambda chunk: {
...       'id': chunk.id,
...       'embedding': chunk.embedding.dense_embedding,
...       'source_url': chunk.metadata.get('url')
...   }
... )
>>> config = BigQueryVectorWriterConfig(
...   write_config={'table': 'project.dataset.embeddings'},
...   schema_config=schema_config
... )
Parameters:
  • write_config – BigQuery write configuration dict. Must include ‘table’. Other options like create_disposition, write_disposition can be specified.

  • schema_config – Optional configuration for custom schema and row conversion. If not provided, uses default schema with id, embedding, content and metadata columns.

Raises:

ValueError – If write_config doesn’t include table specification.

create_write_transform() PTransform[source]

Creates transform to write to BigQuery.