apache_beam.ml.rag.ingestion.cloudsql module
- class apache_beam.ml.rag.ingestion.cloudsql.LanguageConnectorConfig(username: str, password: str, database_name: str, instance_name: str, ip_types: List[str] | None = None, enable_iam_auth: bool = False, target_principal: str | None = None, delegates: List[str] | None = None, quota_project: str | None = None, connection_properties: Dict[str, str] | None = None, additional_properties: Dict[str, Any] | None = None)[source]
Bases:
object
Configuration options for CloudSQL Java language connector.
Set parameters to connect connection to a CloudSQL instance using Java language connector connector. For details see https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/blob/main/docs/jdbc.md
- ip_type
Preferred order of IP types used to connect via a comma list of strings.
- admin_service_endpoint
Optional custom API service endpoint.
- connection_properties
Optional JDBC connection properties dict. Example: {‘ssl’: ‘true’}
- class apache_beam.ml.rag.ingestion.cloudsql.CloudSQLPostgresVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.cloudsql.LanguageConnectorConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.postgres_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, sql_typecast='::float[]'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, sql_typecast='::jsonb')], conflict_resolution: ~apache_beam.ml.rag.ingestion.postgres_common.ConflictResolution | None = ConflictResolution(on_conflict_fields=[], action='IGNORE', update_fields=None))[source]
Bases:
PostgresVectorWriterConfig
Configuration for writing vectors to ClouSQL Postgres.
Supports flexible schema configuration through column specifications and conflict resolution strategies.
- Parameters:
connection_config –
LanguageConnectorConfig
.table_name – Target table name.
write_config – JdbcIO
WriteConfig
to control batch sizes, authosharding, etc.column_specs – Use
ColumnSpecsBuilder
to configure how embeddings and metadata are written a database schema. If None, uses default Chunk schema.conflict_resolution – Optional
ConflictResolution
strategy for handling insert conflicts. ON CONFLICT DO NOTHING by default.
Examples
Basic usage with default schema:
>>> config = PostgresVectorWriterConfig( ... connection_config=PostgresConnectionConfig(...), ... table_name='embeddings' ... )
Simple case with default schema:
>>> config = PostgresVectorWriterConfig( ... connection_config=ConnectionConfig(...), ... table_name='embeddings' ... )
Custom schema with metadata fields:
>>> specs = (ColumnSpecsBuilder() ... .with_id_spec(column_name="my_id_column") ... .with_embedding_spec(column_name="embedding_vec") ... .add_metadata_field(field="source", column_name="src") ... .add_metadata_field( ... "timestamp", ... column_name="created_at", ... sql_typecast="::timestamp" ... ) ... .build())
Minimal schema (only ID + embedding written)
>>> column_specs = (ColumnSpecsBuilder() ... .with_id_spec() ... .with_embedding_spec() ... .build())
>>> config = CloudSQLPostgresVectorWriterConfig( ... connection_config=PostgresConnectionConfig(...), ... table_name='embeddings', ... column_specs=specs ... )