apache_beam.transforms.enrichment_handlers.cloudsql module

class apache_beam.transforms.enrichment_handlers.cloudsql.CustomQueryConfig(query_fn: Callable[[Row], str])[source]

Bases: object

Configuration for using a custom query function.

query_fn: Callable[[Row], str]
class apache_beam.transforms.enrichment_handlers.cloudsql.TableFieldsQueryConfig(table_id: str, where_clause_template: str, where_clause_fields: List[str])[source]

Bases: object

Configuration for using table name, where clause, and field names.

table_id: str
where_clause_template: str
where_clause_fields: List[str]
class apache_beam.transforms.enrichment_handlers.cloudsql.TableFunctionQueryConfig(table_id: str, where_clause_template: str, where_clause_value_fn: Callable[[Row], list[Any]])[source]

Bases: object

Configuration for using table name, where clause, and a value function.

table_id: str
where_clause_template: str
where_clause_value_fn: Callable[[Row], list[Any]]
class apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter(value)[source]

Bases: Enum

An enumeration.

POSTGRESQL = 'pg8000'
MYSQL = 'pymysql'
SQLSERVER = 'pytds'
to_sqlalchemy_dialect()[source]

Map the adapter type to its corresponding SQLAlchemy dialect.

Returns:

SQLAlchemy dialect string.

Return type:

str

class apache_beam.transforms.enrichment_handlers.cloudsql.ConnectionConfig[source]

Bases: ABC

abstract get_connector_handler() Callable[[], Connection][source]
abstract get_db_url() str[source]
class apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLConnectionConfig(db_adapter: ~apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter, instance_connection_uri: str, user: str = <factory>, password: str = <factory>, db_id: str = <factory>, refresh_strategy: ~google.cloud.sql.connector.enums.RefreshStrategy = RefreshStrategy.LAZY, connector_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, connect_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: ConnectionConfig

Connects to Google Cloud SQL using Cloud SQL Python Connector.

Parameters:
  • db_adapter – The database adapter type (PostgreSQL, MySQL, SQL Server).

  • instance_connection_uri – URI for connecting to the Cloud SQL instance.

  • user – Username for authentication.

  • password – Password for authentication. Defaults to None.

  • db_id – Database identifier/name.

  • refresh_strategy – Strategy for refreshing connection (default: LAZY).

  • connector_kwargs – Additional keyword arguments for the Cloud SQL Python Connector. Enables forward compatibility.

  • connect_kwargs – Additional keyword arguments for the client connect method. Enables forward compatibility.

db_adapter: DatabaseTypeAdapter
instance_connection_uri: str
user: str
password: str
db_id: str
refresh_strategy: RefreshStrategy = 'LAZY'
connector_kwargs: Dict[str, Any]
connect_kwargs: Dict[str, Any]
get_connector_handler() Callable[[], Connection][source]

Returns a function that creates a new database connection.

The returned connector function creates database connections that should be properly closed by the caller when no longer needed.

get_db_url() str[source]
class apache_beam.transforms.enrichment_handlers.cloudsql.ExternalSQLDBConnectionConfig(db_adapter: ~apache_beam.transforms.enrichment_handlers.cloudsql.DatabaseTypeAdapter, host: str, port: int, user: str = <factory>, password: str = <factory>, db_id: str = <factory>, connect_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: ConnectionConfig

Connects to External SQL DBs (PostgreSQL, MySQL, SQL Server) over TCP.

Parameters:
  • db_adapter – The database adapter type (PostgreSQL, MySQL, SQL Server).

  • host – Hostname or IP address of the database server.

  • port – Port number for the database connection.

  • user – Username for authentication.

  • password – Password for authentication.

  • db_id – Database identifier/name.

  • connect_kwargs – Additional keyword arguments for the client connect method. Enables forward compatibility.

db_adapter: DatabaseTypeAdapter
host: str
port: int
user: str
password: str
db_id: str
connect_kwargs: Dict[str, Any]
get_connector_handler() Callable[[], Connection][source]

Returns a function that creates a new database connection.

The returned connector function creates database connections that should be properly closed by the caller when no longer needed.

get_db_url() str[source]
class apache_beam.transforms.enrichment_handlers.cloudsql.CloudSQLEnrichmentHandler(connection_config: ConnectionConfig, *, query_config: CustomQueryConfig | TableFieldsQueryConfig | TableFunctionQueryConfig, column_names: list[str] | None = None, min_batch_size: int = 1, max_batch_size: int = 10000, **kwargs)[source]

Bases: EnrichmentSourceHandler[Row, Row]

Enrichment handler for Cloud SQL databases.

This handler is designed to work with the apache_beam.transforms.enrichment.Enrichment transform.

To use this handler, you need to provide one of the following query configs:
  • CustomQueryConfig - For providing a custom query function

  • TableFieldsQueryConfig - For specifying table, where clause, and fields

  • TableFunctionQueryConfig - For specifying table, where clause, and val fn

By default, the handler retrieves all columns from the specified table. To limit the columns, use the column_names parameter to specify the desired column names.

This handler queries the Cloud SQL database per element by default. To enable batching, set the min_batch_size and max_batch_size parameters. These values control the batching behavior in the apache_beam.transforms.utils.BatchElements transform.

NOTE: Batching is not supported when using the CustomQueryConfig.

Example usage:

connection_config = CloudSQLConnectionConfig(
  db_adapter=DatabaseTypeAdapter.POSTGRESQL,
  instance_connection_uri="apache-beam-testing:us-central1:itests",
  user='postgres',
  password= os.getenv("CLOUDSQL_PG_PASSWORD"))
query_config=TableFieldsQueryConfig('my_table',"id = :param0",['id']),
cloudsql_handler = CloudSQLEnrichmentHandler(
  connection_config=connection_config,
  query_config=query_config,
  min_batch_size=2,
  max_batch_size=100)
Parameters:
  • connection_config (ConnectionConfig) – Configuration for connecting to the SQL database. Must be an instance of a subclass of ConnectionConfig, such as CloudSQLConnectionConfig or ExternalSQLDBConnectionConfig. This determines how the handler connects to the target SQL database.

  • query_config – Configuration for database queries. Must be one of:

  • CustomQueryConfig (*) – For providing a custom query function

  • TableFieldsQueryConfig (*) – specifies table, where clause, and field names

  • TableFunctionQueryConfig (*) – specifies table, where clause, and val func

  • column_names (Optional[list[str]]) – List of column names to select from the Cloud SQL table. If not provided, all columns (*) are selected.

  • min_batch_size (int) – Minimum number of rows to batch together when querying the database. Defaults to 1 if query_fn is not used.

  • max_batch_size (int) – Maximum number of rows to batch together. Defaults to 10,000 if query_fn is not used.

  • **kwargs – Additional keyword arguments for database connection or query handling.

Note

  • Cannot use min_batch_size or max_batch_size with query_fn.

  • Either where_clause_fields or where_clause_value_fn must be provided for query construction if query_fn is not provided.

  • Ensure that the database user has the necessary permissions to query the specified table.

create_row_key(row: Row)[source]
get_cache_key(request: Row | list[Row])[source]
batch_elements_kwargs() Mapping[str, Any][source]

Returns a kwargs suitable for beam.BatchElements.