apache_beam.ml.rag.ingestion.base module
- class apache_beam.ml.rag.ingestion.base.VectorDatabaseWriteConfig[source]
Bases:
ABC
Abstract base class for vector database configurations in RAG pipelines.
VectorDatabaseWriteConfig defines the interface for configuring vector database writes in RAG pipelines. Implementations should provide database-specific configuration and create appropriate write transforms.
The configuration flow: 1. Subclass provides database-specific configuration (table names, etc) 2. create_write_transform() creates appropriate PTransform for writing 3. Transform handles converting Chunks to database-specific format
- Example implementation:
>>> class BigQueryVectorWriterConfig(VectorDatabaseWriteConfig): ... def __init__(self, table: str): ... self.embedding_column = embedding_column ... ... def create_write_transform(self): ... return beam.io.WriteToBigQuery( ... table=self.table ... )
- abstract create_write_transform() PTransform[Chunk, Any] [source]
Creates a PTransform that writes embeddings to the vector database.
- Returns:
A PTransform that accepts PCollection[Chunk] and writes the chunks’ embeddings and metadata to the configured vector database. The transform should handle: - Converting Chunk format to database schema - Setting up database connection/client - Writing with appropriate batching/error handling
- class apache_beam.ml.rag.ingestion.base.VectorDatabaseWriteTransform(database_config: VectorDatabaseWriteConfig)[source]
Bases:
PTransform
A PTransform for writing embedded chunks to vector databases.
This transform uses a VectorDatabaseWriteConfig to write chunks with embeddings to vector database. It handles validating the config and applying the database-specific write transform.
- Example usage:
>>> config = BigQueryVectorConfig( ... table='project.dataset.embeddings', ... embedding_column='embedding' ... ) >>> >>> with beam.Pipeline() as p: ... chunks = p | beam.Create([...]) # PCollection[Chunk] ... chunks | VectorDatabaseWriteTransform(config)
- Parameters:
database_config – Configuration for the target vector database. Must be a subclass of VectorDatabaseWriteConfig that implements create_write_transform().
- Raises:
TypeError – If database_config is not a VectorDatabaseWriteConfig instance.
Initialize transform with database config.
- Parameters:
database_config – Configuration for target vector database.
- expand(pcoll: PCollection[Chunk]) PTransform[Chunk, Any] [source]
Creates and applies the database-specific write transform.
- Parameters:
pcoll – PCollection of Chunks with embeddings to write to the vector database. Each Chunk must have: - An embedding - An ID - Metadata used to filter results as specified by database config
- Returns:
Result of writing to database (implementation specific).