apache_beam.ml.rag.ingestion.spanner module

Cloud Spanner vector store writer for RAG pipelines.

This module provides a writer for storing embeddings and associated metadata in Google Cloud Spanner. It supports flexible schema configuration with the ability to flatten metadata fields into dedicated columns.

Example usage:

Default schema (id, embedding, content, metadata): >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings” … )

Flattened metadata fields: >>> specs = ( … SpannerColumnSpecsBuilder() … .with_id_spec() … .with_embedding_spec() … .with_content_spec() … .add_metadata_field(“source”, str) … .add_metadata_field(“page_number”, int, default=0) … .with_metadata_spec() … .build() … ) >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings”, … column_specs=specs … )

Spanner schema example:

CREATE TABLE embeddings (

id STRING(1024) NOT NULL, embedding ARRAY<FLOAT32>(vector_length=>768), content STRING(MAX), source STRING(MAX), page_number INT64, metadata JSON

) PRIMARY KEY (id)

class apache_beam.ml.rag.ingestion.spanner.SpannerColumnSpec(column_name: str, python_type: Type, value_fn: Callable[[Chunk], Any])[source]

Bases: object

Column specification for Spanner vector writes.

Defines how to extract and format values from Chunks for insertion into Spanner table columns. Each spec maps to one column in the target table.

column_name

Name of the Spanner table column

Type:

str

python_type

Python type for the NamedTuple field (required for RowCoder)

Type:

Type

value_fn

Function to extract value from a Chunk

Type:

Callable[[apache_beam.ml.rag.types.Chunk], Any]

Examples

String column: >>> SpannerColumnSpec( … column_name=”id”, … python_type=str, … value_fn=lambda chunk: chunk.id … )

Array column with conversion: >>> SpannerColumnSpec( … column_name=”embedding”, … python_type=List[float], … value_fn=lambda chunk: chunk.embedding.dense_embedding … )

column_name: str
python_type: Type
value_fn: Callable[[Chunk], Any]
class apache_beam.ml.rag.ingestion.spanner.SpannerColumnSpecsBuilder[source]

Bases: object

Builder for creating Spanner column specifications.

Provides a fluent API for defining table schemas and how to populate them from Chunk objects. Supports standard Chunk fields (id, embedding, content, metadata) and flattening metadata fields into dedicated columns.

Example

>>> specs = (
...     SpannerColumnSpecsBuilder()
...     .with_id_spec()
...     .with_embedding_spec()
...     .with_content_spec()
...     .add_metadata_field("source", str)
...     .with_metadata_spec()
...     .build()
... )
static with_defaults() SpannerColumnSpecsBuilder[source]

Create builder with default schema.

Default schema includes: - id (STRING): Chunk ID - embedding (ARRAY<FLOAT32>): Dense embedding vector - content (STRING): Chunk content text - metadata (JSON): Full metadata as JSON

Returns:

Builder with default column specifications

with_id_spec(column_name: str = 'id', python_type: ~typing.Type = <class 'str'>, convert_fn: ~typing.Callable[[str], ~typing.Any] | None = None) SpannerColumnSpecsBuilder[source]

Add ID column specification.

Parameters:
  • column_name – Column name (default: “id”)

  • python_type – Python type (default: str)

  • convert_fn – Optional converter (e.g., to cast to int)

Returns:

Self for method chaining

Examples

Default string ID: >>> builder.with_id_spec()

Integer ID with conversion: >>> builder.with_id_spec( … python_type=int, … convert_fn=lambda id: int(id.split(‘_’)[1]) … )

with_embedding_spec(column_name: str = 'embedding', convert_fn: Callable[[List[float]], List[float]] | None = None) SpannerColumnSpecsBuilder[source]

Add embedding array column (ARRAY<FLOAT32> or ARRAY<FLOAT64>).

Parameters:
  • column_name – Column name (default: “embedding”)

  • convert_fn – Optional converter (e.g., normalize, quantize)

Returns:

Self for method chaining

Examples

Default embedding: >>> builder.with_embedding_spec()

Normalized embedding: >>> def normalize(vec): … norm = (sum(x**2 for x in vec) ** 0.5) or 1.0 … return [x/norm for x in vec] >>> builder.with_embedding_spec(convert_fn=normalize)

Rounded precision: >>> builder.with_embedding_spec( … convert_fn=lambda vec: [round(x, 4) for x in vec] … )

with_content_spec(column_name: str = 'content', python_type: ~typing.Type = <class 'str'>, convert_fn: ~typing.Callable[[str], ~typing.Any] | None = None) SpannerColumnSpecsBuilder[source]

Add content column.

Parameters:
  • column_name – Column name (default: “content”)

  • python_type – Python type (default: str)

  • convert_fn – Optional converter

Returns:

Self for method chaining

Examples

Default text content: >>> builder.with_content_spec()

Content length as integer: >>> builder.with_content_spec( … column_name=”content_length”, … python_type=int, … convert_fn=lambda text: len(text.split()) … )

Truncated content: >>> builder.with_content_spec( … convert_fn=lambda text: text[:1000] … )

with_metadata_spec(column_name: str = 'metadata') SpannerColumnSpecsBuilder[source]

Add metadata JSON column.

Stores the full metadata dictionary as a JSON string in Spanner.

Parameters:

column_name – Column name (default: “metadata”)

Returns:

Self for method chaining

Note

Metadata is automatically converted to JSON string using json.dumps()

add_metadata_field(field: str, python_type: Type, column_name: str | None = None, convert_fn: Callable[[Any], Any] | None = None, default: Any | None = None) SpannerColumnSpecsBuilder[source]

Flatten a metadata field into its own column.

Extracts a specific field from chunk.metadata and stores it in a dedicated table column.

Parameters:
  • field – Key in chunk.metadata to extract

  • python_type – Python type (must be explicitly specified)

  • column_name – Column name (default: same as field)

  • convert_fn – Optional converter for type casting/transformation

  • default – Default value if field is missing from metadata

Returns:

Self for method chaining

Examples

String field: >>> builder.add_metadata_field(“source”, str)

Integer with default: >>> builder.add_metadata_field( … “page_number”, … int, … default=0 … )

Float with conversion: >>> builder.add_metadata_field( … “confidence”, … float, … convert_fn=lambda x: round(float(x), 2), … default=0.0 … )

List of strings: >>> builder.add_metadata_field( … “tags”, … List[str], … default=[] … )

Timestamp with conversion: >>> builder.add_metadata_field( … “created_at”, … str, … convert_fn=lambda ts: ts.isoformat() … )

add_column(column_name: str, python_type: Type, value_fn: Callable[[Chunk], Any]) SpannerColumnSpecsBuilder[source]

Add a custom column with full control.

Parameters:
  • column_name – Column name

  • python_type – Python type (required)

  • value_fn – Value extraction function

Returns:

Self for method chaining

Examples

Boolean flag: >>> builder.add_column( … column_name=”has_code”, … python_type=bool, … value_fn=lambda chunk: “```” in chunk.content.text … )

Computed value: >>> builder.add_column( … column_name=”word_count”, … python_type=int, … value_fn=lambda chunk: len(chunk.content.text.split()) … )

build() List[SpannerColumnSpec][source]

Build the final list of column specifications.

Returns:

List of SpannerColumnSpec objects

class apache_beam.ml.rag.ingestion.spanner.SpannerVectorWriterConfig(project_id: str, instance_id: str, database_id: str, table_name: str, *, column_specs: List[SpannerColumnSpec] | None = None, write_mode: Literal['INSERT', 'UPDATE', 'REPLACE', 'INSERT_OR_UPDATE'] = 'INSERT_OR_UPDATE', max_batch_size_bytes: int | None = None, max_number_mutations: int | None = None, max_number_rows: int | None = None, grouping_factor: int | None = None, host: str | None = None, emulator_host: str | None = None, expansion_service: str | None = None, commit_deadline: int | None = None, max_cumulative_backoff: int | None = None, failure_mode: FailureMode | None = FailureMode.REPORT_FAILURES, high_priority: bool = False, **spanner_kwargs)[source]

Bases: VectorDatabaseWriteConfig

Configuration for writing vectors to Cloud Spanner.

Supports flexible schema configuration through column specifications and provides control over Spanner-specific write parameters.

Examples

Default schema: >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings” … )

Custom schema with flattened metadata: >>> specs = ( … SpannerColumnSpecsBuilder() … .with_id_spec() … .with_embedding_spec() … .with_content_spec() … .add_metadata_field(“source”, str) … .add_metadata_field(“page_number”, int, default=0) … .with_metadata_spec() … .build() … ) >>> config = SpannerVectorWriterConfig( … project_id=”my-project”, … instance_id=”my-instance”, … database_id=”my-db”, … table_name=”embeddings”, … column_specs=specs … )

With emulator: >>> config = SpannerVectorWriterConfig( … project_id=”test-project”, … instance_id=”test-instance”, … database_id=”test-db”, … table_name=”embeddings”, … emulator_host=”http://localhost:9010” … )

Initialize Spanner vector writer configuration.

Parameters:
  • project_id – GCP project ID

  • instance_id – Spanner instance ID

  • database_id – Spanner database ID

  • table_name – Target table name

  • column_specs – Schema configuration using SpannerColumnSpecsBuilder. If None, uses default schema (id, embedding, content, metadata)

  • write_mode – Spanner write operation type: - INSERT: Fail if row exists - UPDATE: Fail if row doesn’t exist - REPLACE: Delete then insert - INSERT_OR_UPDATE: Insert or update if exists (default)

  • max_batch_size_bytes – Maximum bytes per mutation batch (default: 1MB)

  • max_number_mutations – Maximum cell mutations per batch (default: 5000)

  • max_number_rows – Maximum rows per batch (default: 500)

  • grouping_factor – Multiple of max mutation for sorting (default: 1000)

  • host – Spanner host URL (usually not needed)

  • emulator_host – Spanner emulator host (e.g., “http://localhost:9010”)

  • expansion_service – Java expansion service address (host:port)

  • commit_deadline – Commit API deadline in seconds (default: 15)

  • max_cumulative_backoff – Max retry backoff seconds (default: 900)

  • failure_mode – Error handling strategy: - FAIL_FAST: Throw exception for any failure - REPORT_FAILURES: Continue processing (default)

  • high_priority – Use high priority for operations (default: False)

  • **spanner_kwargs – Additional keyword arguments to pass to the underlying Spanner write transform. Use this to pass any Spanner-specific parameters not explicitly exposed by this config.

create_write_transform() PTransform[source]

Create the Spanner write PTransform.

Returns:

PTransform for writing to Spanner