apache_beam.ml.rag.ingestion.postgres module

class apache_beam.ml.rag.ingestion.postgres.PostgresVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.jdbc_common.ConnectionConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.postgres_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, sql_typecast='::float[]'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, sql_typecast='::jsonb')], conflict_resolution: ~apache_beam.ml.rag.ingestion.postgres_common.ConflictResolution | None = ConflictResolution(on_conflict_fields=[], action='IGNORE', update_fields=None))[source]

Bases: VectorDatabaseWriteConfig

Configuration for writing vectors to Postgres using jdbc.

Supports flexible schema configuration through column specifications and conflict resolution strategies.

Parameters:
  • connection_configConnectionConfig.

  • table_name – Target table name.

  • write_config – JdbcIO WriteConfig to control batch sizes, authosharding, etc.

  • column_specs – Use ColumnSpecsBuilder to configure how embeddings and metadata are written a database schema. If None, uses default Chunk schema.

  • conflict_resolution – Optional ConflictResolution strategy for handling insert conflicts. ON CONFLICT DO NOTHING by default.

Examples

Simple case with default schema:

>>> config = PostgresVectorWriterConfig(
...     connection_config=ConnectionConfig(...),
...     table_name='embeddings'
... )

Custom schema with metadata fields:

>>> specs = (ColumnSpecsBuilder()
...         .with_id_spec(column_name="my_id_column")
...         .with_embedding_spec(column_name="embedding_vec")
...         .add_metadata_field(field="source", column_name="src")
...         .add_metadata_field(
...             "timestamp",
...             column_name="created_at",
...             sql_typecast="::timestamp"
...         )
...         .build())

Minimal schema (only ID + embedding written)

>>> column_specs = (ColumnSpecsBuilder()
...     .with_id_spec()
...     .with_embedding_spec()
...     .build())
>>> config = PostgresVectorWriterConfig(
...     connection_config=ConnectionConfig(...),
...     table_name='embeddings',
...     column_specs=specs
... )
create_write_transform() PTransform[source]