apache_beam.ml.rag.ingestion.base module

class apache_beam.ml.rag.ingestion.base.VectorDatabaseWriteConfig[source]

Bases: ABC

Abstract base class for vector database configurations in RAG pipelines.

VectorDatabaseWriteConfig defines the interface for configuring vector database writes in RAG pipelines. Implementations should provide database-specific configuration and create appropriate write transforms.

The configuration flow: 1. Subclass provides database-specific configuration (table names, etc) 2. create_write_transform() creates appropriate PTransform for writing 3. Transform handles converting Chunks to database-specific format

Example implementation:
>>> class BigQueryVectorWriterConfig(VectorDatabaseWriteConfig):
...     def __init__(self, table: str):
...         self.embedding_column = embedding_column
...
...     def create_write_transform(self):
...         return beam.io.WriteToBigQuery(
...             table=self.table
...         )
abstract create_write_transform() PTransform[Chunk, Any][source]

Creates a PTransform that writes embeddings to the vector database.

Returns:

A PTransform that accepts PCollection[Chunk] and writes the chunks’ embeddings and metadata to the configured vector database. The transform should handle: - Converting Chunk format to database schema - Setting up database connection/client - Writing with appropriate batching/error handling

class apache_beam.ml.rag.ingestion.base.VectorDatabaseWriteTransform(database_config: VectorDatabaseWriteConfig)[source]

Bases: PTransform

A PTransform for writing embedded chunks to vector databases.

This transform uses a VectorDatabaseWriteConfig to write chunks with embeddings to vector database. It handles validating the config and applying the database-specific write transform.

Example usage:
>>> config = BigQueryVectorConfig(
...     table='project.dataset.embeddings',
...     embedding_column='embedding'
... )
>>>
>>> with beam.Pipeline() as p:
...     chunks = p | beam.Create([...])  # PCollection[Chunk]
...     chunks | VectorDatabaseWriteTransform(config)
Parameters:

database_config – Configuration for the target vector database. Must be a subclass of VectorDatabaseWriteConfig that implements create_write_transform().

Raises:

TypeError – If database_config is not a VectorDatabaseWriteConfig instance.

Initialize transform with database config.

Parameters:

database_config – Configuration for target vector database.

expand(pcoll: PCollection[Chunk]) PTransform[Chunk, Any][source]

Creates and applies the database-specific write transform.

Parameters:

pcoll – PCollection of Chunks with embeddings to write to the vector database. Each Chunk must have: - An embedding - An ID - Metadata used to filter results as specified by database config

Returns:

Result of writing to database (implementation specific).