apache_beam.transforms.enrichment_handlers.cloudsql module
- class apache_beam.transforms.enrichment_handlers.cloudsql.CustomQueryConfig(query_fn: Callable[[Row], str])[source]
Bases:
object
Configuration for using a custom query function.
- class apache_beam.transforms.enrichment_handlers.cloudsql.TableFieldsQueryConfig(table_id: str, where_clause_template: str, where_clause_fields: List[str])[source]
Bases:
object
Configuration for using table name, where clause, and field names.
- class apache_beam.transforms.enrichment_handlers.cloudsql.TableFunctionQueryConfig(table_id: str, where_clause_template: str, where_clause_value_fn: Callable[[Row], list[Any]])[source]
Bases:
object
Configuration for using table name, where clause, and a value function.
- class apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter(value)[source]
Bases:
Enum
An enumeration.
- POSTGRESQL = 'pg8000'
- MYSQL = 'pymysql'
- SQLSERVER = 'pytds'
- class apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLConnectionConfig(db_adapter: ~apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter, instance_connection_uri: str, user: str = <factory>, password: str = <factory>, db_id: str = <factory>, refresh_strategy: ~google.cloud.sql.connector.enums.RefreshStrategy = RefreshStrategy.LAZY, connector_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, connect_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
ConnectionConfig
Connects to Google Cloud SQL using Cloud SQL Python Connector.
- Parameters:
db_adapter – The database adapter type (PostgreSQL, MySQL, SQL Server).
instance_connection_uri – URI for connecting to the Cloud SQL instance.
user – Username for authentication.
password – Password for authentication. Defaults to None.
db_id – Database identifier/name.
refresh_strategy – Strategy for refreshing connection (default: LAZY).
connector_kwargs – Additional keyword arguments for the Cloud SQL Python Connector. Enables forward compatibility.
connect_kwargs – Additional keyword arguments for the client connect method. Enables forward compatibility.
- db_adapter: DatabaseTypeAdapter
- refresh_strategy: RefreshStrategy = 'LAZY'
- class apache_beam.transforms.enrichment_handlers.cloudsql.ExternalSQLDBConnectionConfig(db_adapter: ~apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter, host: str, port: int, user: str = <factory>, password: str = <factory>, db_id: str = <factory>, connect_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
ConnectionConfig
Connects to External SQL DBs (PostgreSQL, MySQL, SQL Server) over TCP.
- Parameters:
db_adapter – The database adapter type (PostgreSQL, MySQL, SQL Server).
host – Hostname or IP address of the database server.
port – Port number for the database connection.
user – Username for authentication.
password – Password for authentication.
db_id – Database identifier/name.
connect_kwargs – Additional keyword arguments for the client connect method. Enables forward compatibility.
- db_adapter: DatabaseTypeAdapter
- class apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLEnrichmentHandler(connection_config: ConnectionConfig, *, query_config: CustomQueryConfig | TableFieldsQueryConfig | TableFunctionQueryConfig, column_names: list[str] | None = None, min_batch_size: int = 1, max_batch_size: int = 10000, **kwargs)[source]
Bases:
EnrichmentSourceHandler
[Row
,Row
]Enrichment handler for Cloud SQL databases.
This handler is designed to work with the
apache_beam.transforms.enrichment.Enrichment
transform.- To use this handler, you need to provide one of the following query configs:
CustomQueryConfig - For providing a custom query function
TableFieldsQueryConfig - For specifying table, where clause, and fields
TableFunctionQueryConfig - For specifying table, where clause, and val fn
By default, the handler retrieves all columns from the specified table. To limit the columns, use the column_names parameter to specify the desired column names.
This handler queries the Cloud SQL database per element by default. To enable batching, set the min_batch_size and max_batch_size parameters. These values control the batching behavior in the
apache_beam.transforms.utils.BatchElements
transform.NOTE: Batching is not supported when using the CustomQueryConfig.
Example usage:
connection_config = CloudSQLConnectionConfig( db_adapter=DatabaseTypeAdapter.POSTGRESQL, instance_connection_uri="apache-beam-testing:us-central1:itests", user='postgres', password= os.getenv("CLOUDSQL_PG_PASSWORD")) query_config=TableFieldsQueryConfig('my_table',"id = :param0",['id']), cloudsql_handler = CloudSQLEnrichmentHandler( connection_config=connection_config, query_config=query_config, min_batch_size=2, max_batch_size=100)
- Parameters:
connection_config (ConnectionConfig) – Configuration for connecting to the SQL database. Must be an instance of a subclass of ConnectionConfig, such as CloudSQLConnectionConfig or ExternalSQLDBConnectionConfig. This determines how the handler connects to the target SQL database.
query_config – Configuration for database queries. Must be one of:
CustomQueryConfig (*) – For providing a custom query function
TableFieldsQueryConfig (*) – specifies table, where clause, and field names
TableFunctionQueryConfig (*) – specifies table, where clause, and val func
column_names (Optional[list[str]]) – List of column names to select from the Cloud SQL table. If not provided, all columns (*) are selected.
min_batch_size (int) – Minimum number of rows to batch together when querying the database. Defaults to 1 if query_fn is not used.
max_batch_size (int) – Maximum number of rows to batch together. Defaults to 10,000 if query_fn is not used.
**kwargs – Additional keyword arguments for database connection or query handling.
Note
Cannot use min_batch_size or max_batch_size with query_fn.
Either where_clause_fields or where_clause_value_fn must be provided for query construction if query_fn is not provided.
Ensure that the database user has the necessary permissions to query the specified table.