apache_beam.ml.rag.ingestion.bigquery module

class apache_beam.ml.rag.ingestion.bigquery.SchemaConfig(schema: Dict, embeddable_to_dict_fn: Callable[[EmbeddableItem], Dict[str, any]] | None = None, **kwargs)[source]

Bases: object

Configuration for custom BigQuery schema and row conversion.

Allows overriding the default schema and row conversion logic for BigQuery vector storage. This enables custom table schemas beyond the default id/embedding/content/metadata structure.

Parameters:
  • schema – BigQuery TableSchema dict defining the table structure.

  • embeddable_to_dict_fn – Function that converts an EmbeddableItem to a dict matching the schema. Takes an EmbeddableItem and returns Dict[str, Any] with keys matching schema fields.

Example with custom schema:
>>> schema_config = SchemaConfig(
...   schema={
...     'fields': [
...       {'name': 'id', 'type': 'STRING'},
...       {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
...       {'name': 'source_url', 'type': 'STRING'}
...     ]
...   },
...   embeddable_to_dict_fn=lambda item: {
...       'id': item.id,
...       'embedding': item.embedding.dense_embedding,
...       'source_url': item.metadata.get('url')
...   }
... )
class apache_beam.ml.rag.ingestion.bigquery.BigQueryVectorWriterConfig(write_config: Dict[str, Any], *, schema_config: SchemaConfig | None = None)[source]

Bases: VectorDatabaseWriteConfig

Configuration for writing vectors to BigQuery using managed transforms.

Supports both default schema (id, embedding, content, metadata columns) and custom schemas through SchemaConfig.

Example with default schema:
>>> config = BigQueryVectorWriterConfig(
...     write_config={'table': 'project.dataset.embeddings'})
Example with custom schema:
>>> schema_config = SchemaConfig(
...   schema={
...     'fields': [
...       {'name': 'id', 'type': 'STRING'},
...       {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
...       {'name': 'source_url', 'type': 'STRING'}
...     ]
...   },
...   embeddable_to_dict_fn=lambda chunk: {
...       'id': chunk.id,
...       'embedding': chunk.embedding.dense_embedding,
...       'source_url': chunk.metadata.get('url')
...   }
... )
>>> config = BigQueryVectorWriterConfig(
...   write_config={'table': 'project.dataset.embeddings'},
...   schema_config=schema_config
... )
Parameters:
  • write_config – BigQuery write configuration dict. Must include ‘table’. Other options like create_disposition, write_disposition can be specified.

  • schema_config – Optional configuration for custom schema and row conversion. If not provided, uses default schema with id, embedding, content and metadata columns.

Raises:

ValueError – If write_config doesn’t include table specification.

create_write_transform() PTransform[source]

Creates transform to write to BigQuery.