apache_beam.ml.rag.ingestion.cloudsql module

class apache_beam.ml.rag.ingestion.cloudsql.LanguageConnectorConfig(username: str, password: str, database_name: str, instance_name: str, ip_types: List[str] | None = None, enable_iam_auth: bool = False, target_principal: str | None = None, delegates: List[str] | None = None, quota_project: str | None = None, connection_properties: Dict[str, str] | None = None, additional_properties: Dict[str, Any] | None = None)[source]

Bases: object

Configuration options for CloudSQL Java language connector.

Set parameters to connect connection to a CloudSQL instance using Java language connector connector. For details see https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory/blob/main/docs/jdbc.md

username

Database username.

Type:

str

password

Database password. Can be empty string when using IAM.

Type:

str

database_name

Name of the database to connect to.

Type:

str

instance_name

Instance connection name. Format: ‘<PROJECT>:<REGION>:<INSTANCE>’

Type:

str

ip_type

Preferred order of IP types used to connect via a comma list of strings.

enable_iam_auth

Whether to enable IAM authentication. Default is False

Type:

bool

target_principal

Optional service account to impersonate for connection.

Type:

str | None

delegates

Optional list of service accounts for delegated impersonation.

Type:

List[str] | None

admin_service_endpoint

Optional custom API service endpoint.

quota_project

Optional project ID for quota and billing.

Type:

str | None

connection_properties

Optional JDBC connection properties dict. Example: {‘ssl’: ‘true’}

Type:

Dict[str, str] | None

additional_properties

Additional properties to be added to the JDBC url. Example: {‘someProperty’: ‘true’}

Type:

Dict[str, Any] | None

username: str
password: str
database_name: str
instance_name: str
ip_types: List[str] | None = None
enable_iam_auth: bool = False
target_principal: str | None = None
delegates: List[str] | None = None
quota_project: str | None = None
connection_properties: Dict[str, str] | None = None
additional_properties: Dict[str, Any] | None = None
to_connection_config()[source]
additional_jdbc_args() Dict[str, List[Any]][source]
class apache_beam.ml.rag.ingestion.cloudsql.CloudSQLPostgresVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.cloudsql.LanguageConnectorConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.postgres_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, sql_typecast='::float[]'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, sql_typecast=None), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, sql_typecast='::jsonb')], conflict_resolution: ~apache_beam.ml.rag.ingestion.postgres_common.ConflictResolution | None = ConflictResolution(on_conflict_fields=[], action='IGNORE', update_fields=None))[source]

Bases: PostgresVectorWriterConfig

Configuration for writing vectors to ClouSQL Postgres.

Supports flexible schema configuration through column specifications and conflict resolution strategies.

Parameters:
  • connection_configLanguageConnectorConfig.

  • table_name – Target table name.

  • write_config – JdbcIO WriteConfig to control batch sizes, authosharding, etc.

  • column_specs – Use ColumnSpecsBuilder to configure how embeddings and metadata are written a database schema. If None, uses default Chunk schema.

  • conflict_resolution – Optional ConflictResolution strategy for handling insert conflicts. ON CONFLICT DO NOTHING by default.

Examples

Basic usage with default schema:

>>> config = PostgresVectorWriterConfig(
...     connection_config=PostgresConnectionConfig(...),
...     table_name='embeddings'
... )

Simple case with default schema:

>>> config = PostgresVectorWriterConfig(
...     connection_config=ConnectionConfig(...),
...     table_name='embeddings'
... )

Custom schema with metadata fields:

>>> specs = (ColumnSpecsBuilder()
...         .with_id_spec(column_name="my_id_column")
...         .with_embedding_spec(column_name="embedding_vec")
...         .add_metadata_field(field="source", column_name="src")
...         .add_metadata_field(
...             "timestamp",
...             column_name="created_at",
...             sql_typecast="::timestamp"
...         )
...         .build())

Minimal schema (only ID + embedding written)

>>> column_specs = (ColumnSpecsBuilder()
...     .with_id_spec()
...     .with_embedding_spec()
...     .build())
>>> config = CloudSQLPostgresVectorWriterConfig(
...     connection_config=PostgresConnectionConfig(...),
...     table_name='embeddings',
...     column_specs=specs
... )