apache_beam.ml.rag.ingestion.postgres module
- class apache_beam.ml.rag.ingestion.postgres.PostgresVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.jdbc_common.ConnectionConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.postgres_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, sql_typecast='::float[]'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, sql_typecast='::jsonb')], conflict_resolution: ~apache_beam.ml.rag.ingestion.postgres_common.ConflictResolution | None = ConflictResolution(on_conflict_fields=[], action='IGNORE', update_fields=None))[source]
Bases:
VectorDatabaseWriteConfig
Configuration for writing vectors to Postgres using jdbc.
Supports flexible schema configuration through column specifications and conflict resolution strategies.
- Parameters:
connection_config –
ConnectionConfig
.table_name – Target table name.
write_config – JdbcIO
WriteConfig
to control batch sizes, authosharding, etc.column_specs – Use
ColumnSpecsBuilder
to configure how embeddings and metadata are written a database schema. If None, uses default Chunk schema.conflict_resolution – Optional
ConflictResolution
strategy for handling insert conflicts. ON CONFLICT DO NOTHING by default.
Examples
Simple case with default schema:
>>> config = PostgresVectorWriterConfig( ... connection_config=ConnectionConfig(...), ... table_name='embeddings' ... )
Custom schema with metadata fields:
>>> specs = (ColumnSpecsBuilder() ... .with_id_spec(column_name="my_id_column") ... .with_embedding_spec(column_name="embedding_vec") ... .add_metadata_field(field="source", column_name="src") ... .add_metadata_field( ... "timestamp", ... column_name="created_at", ... sql_typecast="::timestamp" ... ) ... .build())
Minimal schema (only ID + embedding written)
>>> column_specs = (ColumnSpecsBuilder() ... .with_id_spec() ... .with_embedding_spec() ... .build())
>>> config = PostgresVectorWriterConfig( ... connection_config=ConnectionConfig(...), ... table_name='embeddings', ... column_specs=specs ... )
- create_write_transform() PTransform [source]