apache_beam.ml.rag.ingestion.spanner module
Cloud Spanner vector store writer for RAG pipelines.
This module provides a writer for storing embeddings and associated metadata in Google Cloud Spanner. It supports flexible schema configuration with the ability to flatten metadata fields into dedicated columns.
Example usage:
Default schema (id, embedding, content, metadata): >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings” … )
Flattened metadata fields: >>> specs = ( … SpannerColumnSpecsBuilder() … .with_id_spec() … .with_embedding_spec() … .with_content_spec() … .add_metadata_field(“source”, str) … .add_metadata_field(“page_number”, int, default=0) … .with_metadata_spec() … .build() … ) >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings”, … column_specs=specs … )
Spanner schema example:
- CREATE TABLE embeddings (
id STRING(1024) NOT NULL, embedding ARRAY<FLOAT32>(vector_length=>768), content STRING(MAX), source STRING(MAX), page_number INT64, metadata JSON
) PRIMARY KEY (id)
- class apache_beam.ml.rag.ingestion.spanner.SpannerColumnSpec(column_name: str, python_type: Type, value_fn: Callable[[Chunk], Any])[source]
Bases:
objectColumn specification for Spanner vector writes.
Defines how to extract and format values from Chunks for insertion into Spanner table columns. Each spec maps to one column in the target table.
- python_type
Python type for the NamedTuple field (required for RowCoder)
- Type:
Type
- value_fn
Function to extract value from a Chunk
- Type:
Callable[[apache_beam.ml.rag.types.Chunk], Any]
Examples
String column: >>> SpannerColumnSpec( … column_name=”id”, … python_type=str, … value_fn=lambda chunk: chunk.id … )
Array column with conversion: >>> SpannerColumnSpec( … column_name=”embedding”, … python_type=List[float], … value_fn=lambda chunk: chunk.embedding.dense_embedding … )
- class apache_beam.ml.rag.ingestion.spanner.SpannerColumnSpecsBuilder[source]
Bases:
objectBuilder for creating Spanner column specifications.
Provides a fluent API for defining table schemas and how to populate them from Chunk objects. Supports standard Chunk fields (id, embedding, content, metadata) and flattening metadata fields into dedicated columns.
Example
>>> specs = ( ... SpannerColumnSpecsBuilder() ... .with_id_spec() ... .with_embedding_spec() ... .with_content_spec() ... .add_metadata_field("source", str) ... .with_metadata_spec() ... .build() ... )
- static with_defaults() SpannerColumnSpecsBuilder[source]
Create builder with default schema.
Default schema includes: - id (STRING): Chunk ID - embedding (ARRAY<FLOAT32>): Dense embedding vector - content (STRING): Chunk content text - metadata (JSON): Full metadata as JSON
- Returns:
Builder with default column specifications
- with_id_spec(column_name: str = 'id', python_type: ~typing.Type = <class 'str'>, convert_fn: ~typing.Callable[[str], ~typing.Any] | None = None) SpannerColumnSpecsBuilder[source]
Add ID column specification.
- Parameters:
column_name – Column name (default: “id”)
python_type – Python type (default: str)
convert_fn – Optional converter (e.g., to cast to int)
- Returns:
Self for method chaining
Examples
Default string ID: >>> builder.with_id_spec()
Integer ID with conversion: >>> builder.with_id_spec( … python_type=int, … convert_fn=lambda id: int(id.split(‘_’)[1]) … )
- with_embedding_spec(column_name: str = 'embedding', convert_fn: Callable[[List[float]], List[float]] | None = None) SpannerColumnSpecsBuilder[source]
Add embedding array column (ARRAY<FLOAT32> or ARRAY<FLOAT64>).
- Parameters:
column_name – Column name (default: “embedding”)
convert_fn – Optional converter (e.g., normalize, quantize)
- Returns:
Self for method chaining
Examples
Default embedding: >>> builder.with_embedding_spec()
Normalized embedding: >>> def normalize(vec): … norm = (sum(x**2 for x in vec) ** 0.5) or 1.0 … return [x/norm for x in vec] >>> builder.with_embedding_spec(convert_fn=normalize)
Rounded precision: >>> builder.with_embedding_spec( … convert_fn=lambda vec: [round(x, 4) for x in vec] … )
- with_content_spec(column_name: str = 'content', python_type: ~typing.Type = <class 'str'>, convert_fn: ~typing.Callable[[str], ~typing.Any] | None = None) SpannerColumnSpecsBuilder[source]
Add content column.
- Parameters:
column_name – Column name (default: “content”)
python_type – Python type (default: str)
convert_fn – Optional converter
- Returns:
Self for method chaining
Examples
Default text content: >>> builder.with_content_spec()
Content length as integer: >>> builder.with_content_spec( … column_name=”content_length”, … python_type=int, … convert_fn=lambda text: len(text.split()) … )
Truncated content: >>> builder.with_content_spec( … convert_fn=lambda text: text[:1000] … )
- with_metadata_spec(column_name: str = 'metadata') SpannerColumnSpecsBuilder[source]
Add metadata JSON column.
Stores the full metadata dictionary as a JSON string in Spanner.
- Parameters:
column_name – Column name (default: “metadata”)
- Returns:
Self for method chaining
Note
Metadata is automatically converted to JSON string using json.dumps()
- add_metadata_field(field: str, python_type: Type, column_name: str | None = None, convert_fn: Callable[[Any], Any] | None = None, default: Any | None = None) SpannerColumnSpecsBuilder[source]
Flatten a metadata field into its own column.
Extracts a specific field from chunk.metadata and stores it in a dedicated table column.
- Parameters:
field – Key in chunk.metadata to extract
python_type – Python type (must be explicitly specified)
column_name – Column name (default: same as field)
convert_fn – Optional converter for type casting/transformation
default – Default value if field is missing from metadata
- Returns:
Self for method chaining
Examples
String field: >>> builder.add_metadata_field(“source”, str)
Integer with default: >>> builder.add_metadata_field( … “page_number”, … int, … default=0 … )
Float with conversion: >>> builder.add_metadata_field( … “confidence”, … float, … convert_fn=lambda x: round(float(x), 2), … default=0.0 … )
List of strings: >>> builder.add_metadata_field( … “tags”, … List[str], … default=[] … )
Timestamp with conversion: >>> builder.add_metadata_field( … “created_at”, … str, … convert_fn=lambda ts: ts.isoformat() … )
- add_column(column_name: str, python_type: Type, value_fn: Callable[[Chunk], Any]) SpannerColumnSpecsBuilder[source]
Add a custom column with full control.
- Parameters:
column_name – Column name
python_type – Python type (required)
value_fn – Value extraction function
- Returns:
Self for method chaining
Examples
Boolean flag: >>> builder.add_column( … column_name=”has_code”, … python_type=bool, … value_fn=lambda chunk: “```” in chunk.content.text … )
Computed value: >>> builder.add_column( … column_name=”word_count”, … python_type=int, … value_fn=lambda chunk: len(chunk.content.text.split()) … )
- build() List[SpannerColumnSpec][source]
Build the final list of column specifications.
- Returns:
List of SpannerColumnSpec objects
- class apache_beam.ml.rag.ingestion.spanner.SpannerVectorWriterConfig(project_id: str, instance_id: str, database_id: str, table_name: str, *, column_specs: List[SpannerColumnSpec] | None = None, write_mode: Literal['INSERT', 'UPDATE', 'REPLACE', 'INSERT_OR_UPDATE'] = 'INSERT_OR_UPDATE', max_batch_size_bytes: int | None = None, max_number_mutations: int | None = None, max_number_rows: int | None = None, grouping_factor: int | None = None, host: str | None = None, emulator_host: str | None = None, expansion_service: str | None = None, commit_deadline: int | None = None, max_cumulative_backoff: int | None = None, failure_mode: FailureMode | None = FailureMode.REPORT_FAILURES, high_priority: bool = False, **spanner_kwargs)[source]
Bases:
VectorDatabaseWriteConfigConfiguration for writing vectors to Cloud Spanner.
Supports flexible schema configuration through column specifications and provides control over Spanner-specific write parameters.
Examples
Default schema: >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings” … )
Custom schema with flattened metadata: >>> specs = ( … SpannerColumnSpecsBuilder() … .with_id_spec() … .with_embedding_spec() … .with_content_spec() … .add_metadata_field(“source”, str) … .add_metadata_field(“page_number”, int, default=0) … .with_metadata_spec() … .build() … ) >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings”, … column_specs=specs … )
With emulator: >>> config = SpannerVectorWriterConfig( … project_id=”test-project”, … instance_id=”test-instance”, … database_id=”test-db”, … table_name=”embeddings”, … emulator_host=”http://localhost:9010” … )
Initialize Spanner vector writer configuration.
- Parameters:
project_id – GCP project ID
instance_id – Spanner instance ID
database_id – Spanner database ID
table_name – Target table name
column_specs – Schema configuration using SpannerColumnSpecsBuilder. If None, uses default schema (id, embedding, content, metadata)
write_mode – Spanner write operation type: - INSERT: Fail if row exists - UPDATE: Fail if row doesn’t exist - REPLACE: Delete then insert - INSERT_OR_UPDATE: Insert or update if exists (default)
max_batch_size_bytes – Maximum bytes per mutation batch (default: 1MB)
max_number_mutations – Maximum cell mutations per batch (default: 5000)
max_number_rows – Maximum rows per batch (default: 500)
grouping_factor – Multiple of max mutation for sorting (default: 1000)
host – Spanner host URL (usually not needed)
emulator_host – Spanner emulator host (e.g., “http://localhost:9010”)
expansion_service – Java expansion service address (host:port)
commit_deadline – Commit API deadline in seconds (default: 15)
max_cumulative_backoff – Max retry backoff seconds (default: 900)
failure_mode – Error handling strategy: - FAIL_FAST: Throw exception for any failure - REPORT_FAILURES: Continue processing (default)
high_priority – Use high priority for operations (default: False)
**spanner_kwargs – Additional keyword arguments to pass to the underlying Spanner write transform. Use this to pass any Spanner-specific parameters not explicitly exposed by this config.
- create_write_transform() PTransform[source]
Create the Spanner write PTransform.
- Returns:
PTransform for writing to Spanner