Managed I/O Connectors

Beam’s new Managed API streamlines how you use existing I/Os, offering both simplicity and powerful enhancements. I/Os are now configured through a lightweight, consistent interface: a simple configuration map with a unified API that spans multiple connectors.

With Managed I/O, runners gain deeper insight into each I/O’s structure and intent. This allows the runner to optimize performance, adjust behavior dynamically, or even replace the I/O with a more efficient or updated implementation behind the scenes.

For example, the DataflowRunner can seamlessly upgrade a Managed transform to its latest SDK version, automatically applying bug fixes and new features (no manual updates or user intervention required!)

Supported SDKs

The Managed API is directly accessible through the Java and Python SDKs.

Additionally, some SDKs use the Managed API internally. For example, the Iceberg connector used in Beam YAML and Beam SQL is invoked via the Managed API under the hood.

Available Configurations

Note: required configuration fields are bolded.

Connector NameRead ConfigurationWrite Configuration
KAFKAbootstrap_servers (str)
topic (str)
allow_duplicates (boolean)
confluent_schema_registry_subject (str)
confluent_schema_registry_url (str)
consumer_config_updates (map[str, str])
file_descriptor_path (str)
format (str)
message_name (str)
offset_deduplication (boolean)
redistribute_by_record_key (boolean)
redistribute_num_keys (int32)
redistributed (boolean)
schema (str)
bootstrap_servers (str)
format (str)
topic (str)
file_descriptor_path (str)
message_name (str)
producer_config_updates (map[str, str])
schema (str)
ICEBERGtable (str)
catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
drop (list[str])
filter (str)
keep (list[str])
table (str)
catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
drop (list[str])
keep (list[str])
only (str)
partition_fields (list[str])
table_properties (map[str, str])
triggering_frequency_seconds (int32)
ICEBERG_CDCtable (str)
catalog_name (str)
catalog_properties (map[str, str])
config_properties (map[str, str])
drop (list[str])
filter (str)
from_snapshot (int64)
from_timestamp (int64)
keep (list[str])
poll_interval_seconds (int32)
starting_strategy (str)
streaming (boolean)
to_snapshot (int64)
to_timestamp (int64)
Unavailable
BIGQUERYkms_key (str)
query (str)
row_restriction (str)
fields (list[str])
table (str)
table (str)
drop (list[str])
keep (list[str])
kms_key (str)
only (str)
triggering_frequency_seconds (int64)
POSTGRESjdbc_url (str)
connection_init_sql (list[str])
connection_properties (str)
disable_auto_commit (boolean)
driver_class_name (str)
driver_jars (str)
fetch_size (int32)
jdbc_type (str)
location (str)
num_partitions (int32)
output_parallelization (boolean)
partition_column (str)
password (str)
read_query (str)
username (str)
jdbc_url (str)
autosharding (boolean)
batch_size (int64)
connection_init_sql (list[str])
connection_properties (str)
driver_class_name (str)
driver_jars (str)
jdbc_type (str)
location (str)
password (str)
username (str)
write_statement (str)
MYSQLjdbc_url (str)
connection_init_sql (list[str])
connection_properties (str)
disable_auto_commit (boolean)
driver_class_name (str)
driver_jars (str)
fetch_size (int32)
jdbc_type (str)
location (str)
num_partitions (int32)
output_parallelization (boolean)
partition_column (str)
password (str)
read_query (str)
username (str)
jdbc_url (str)
autosharding (boolean)
batch_size (int64)
connection_init_sql (list[str])
connection_properties (str)
driver_class_name (str)
driver_jars (str)
jdbc_type (str)
location (str)
password (str)
username (str)
write_statement (str)
SQLSERVERjdbc_url (str)
connection_init_sql (list[str])
connection_properties (str)
disable_auto_commit (boolean)
driver_class_name (str)
driver_jars (str)
fetch_size (int32)
jdbc_type (str)
location (str)
num_partitions (int32)
output_parallelization (boolean)
partition_column (str)
password (str)
read_query (str)
username (str)
jdbc_url (str)
autosharding (boolean)
batch_size (int64)
connection_init_sql (list[str])
connection_properties (str)
driver_class_name (str)
driver_jars (str)
jdbc_type (str)
location (str)
password (str)
username (str)
write_statement (str)

Configuration Details

KAFKA Write

ConfigurationTypeDescription
bootstrap_serversstrA list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. | Format: host1:port1,host2:port2,...
formatstrThe encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO
topicstrn/a
file_descriptor_pathstrThe path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.
message_namestrThe name of the Protocol Buffer message to be used for schema extraction and data conversion.
producer_config_updatesmap[str, str]A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
schemastrn/a

KAFKA Read

ConfigurationTypeDescription
bootstrap_serversstrA list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form `host1:port1,host2:port2,...`
topicstrn/a
allow_duplicatesbooleanIf the Kafka read allows duplicates.
confluent_schema_registry_subjectstrn/a
confluent_schema_registry_urlstrn/a
consumer_config_updatesmap[str, str]A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
file_descriptor_pathstrThe path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization.
formatstrThe encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO
message_namestrThe name of the Protocol Buffer message to be used for schema extraction and data conversion.
offset_deduplicationbooleanIf the redistribute is using offset deduplication mode.
redistribute_by_record_keybooleanIf the redistribute keys by the Kafka record key.
redistribute_num_keysint32The number of keys for redistributing Kafka inputs.
redistributedbooleanIf the Kafka read should be redistributed.
schemastrThe schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry.

ICEBERG Read

ConfigurationTypeDescription
tablestrIdentifier of the Iceberg table.
catalog_namestrName of the catalog containing the table.
catalog_propertiesmap[str, str]Properties used to set up the Iceberg catalog.
config_propertiesmap[str, str]Properties passed to the Hadoop Configuration.
droplist[str]A subset of column names to exclude from reading. If null or empty, all columns will be read.
filterstrSQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html
keeplist[str]A subset of column names to read exclusively. If null or empty, all columns will be read.

ICEBERG Write

ConfigurationTypeDescription
tablestrA fully-qualified table identifier. You may also provide a template to write to multiple dynamic destinations, for example: `dataset.my_{col1}_{col2.nested}_table`.
catalog_namestrName of the catalog containing the table.
catalog_propertiesmap[str, str]Properties used to set up the Iceberg catalog.
config_propertiesmap[str, str]Properties passed to the Hadoop Configuration.
droplist[str]A list of field names to drop from the input record before writing. Is mutually exclusive with 'keep' and 'only'.
keeplist[str]A list of field names to keep in the input record. All other fields are dropped before writing. Is mutually exclusive with 'drop' and 'only'.
onlystrThe name of a single record field that should be written. Is mutually exclusive with 'keep' and 'drop'.
partition_fieldslist[str]Fields used to create a partition spec that is applied when tables are created. For a field 'foo', the available partition transforms are:
  • foo
  • truncate(foo, N)
  • bucket(foo, N)
  • hour(foo)
  • day(foo)
  • month(foo)
  • year(foo)
  • void(foo)

For more information on partition transforms, please visit https://iceberg.apache.org/spec/#partition-transforms.

table_propertiesmap[str, str]Iceberg table properties to be set on the table when it is created. For more information on table properties, please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.
triggering_frequency_secondsint32For a streaming pipeline, sets the frequency at which snapshots are produced.

ICEBERG_CDC Read

ConfigurationTypeDescription
tablestrIdentifier of the Iceberg table.
catalog_namestrName of the catalog containing the table.
catalog_propertiesmap[str, str]Properties used to set up the Iceberg catalog.
config_propertiesmap[str, str]Properties passed to the Hadoop Configuration.
droplist[str]A subset of column names to exclude from reading. If null or empty, all columns will be read.
filterstrSQL-like predicate to filter data at scan time. Example: "id > 5 AND status = 'ACTIVE'". Uses Apache Calcite syntax: https://calcite.apache.org/docs/reference.html
from_snapshotint64Starts reading from this snapshot ID (inclusive).
from_timestampint64Starts reading from the first snapshot (inclusive) that was created after this timestamp (in milliseconds).
keeplist[str]A subset of column names to read exclusively. If null or empty, all columns will be read.
poll_interval_secondsint32The interval at which to poll for new snapshots. Defaults to 60 seconds.
starting_strategystrThe source's starting strategy. Valid options are: "earliest" or "latest". Can be overriden by setting a starting snapshot or timestamp. Defaults to earliest for batch, and latest for streaming.
streamingbooleanEnables streaming reads, where source continuously polls for snapshots forever.
to_snapshotint64Reads up to this snapshot ID (inclusive).
to_timestampint64Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds).

BIGQUERY Read

ConfigurationTypeDescription
kms_keystrUse this Cloud KMS key to encrypt your data
querystrThe SQL query to be executed to read from the BigQuery table.
row_restrictionstrRead only rows that match this filter, which must be compatible with Google standard SQL. This is not supported when reading via query.
fieldslist[str]Read only the specified fields (columns) from a BigQuery table. Fields may not be returned in the order specified. If no value is specified, then all fields are returned. Example: "col1, col2, col3"
tablestrThe fully-qualified name of the BigQuery table to read from. Format: [${PROJECT}:]${DATASET}.${TABLE}

BIGQUERY Write

ConfigurationTypeDescription
tablestrThe bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}
droplist[str]A list of field names to drop from the input record before writing. Is mutually exclusive with 'keep' and 'only'.
keeplist[str]A list of field names to keep in the input record. All other fields are dropped before writing. Is mutually exclusive with 'drop' and 'only'.
kms_keystrUse this Cloud KMS key to encrypt your data
onlystrThe name of a single record field that should be written. Is mutually exclusive with 'keep' and 'drop'.
triggering_frequency_secondsint64Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.

POSTGRES Read

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC source.
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
disable_auto_commitbooleanWhether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
fetch_sizeint32This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to read from.
num_partitionsint32The number of partitions
output_parallelizationbooleanWhether to reshuffle the resulting PCollection so results are distributed to all workers.
partition_columnstrName of a column of numeric type that will be used for partitioning.
passwordstrPassword for the JDBC source.
read_querystrSQL query used to query the JDBC source.
usernamestrUsername for the JDBC source.

POSTGRES Write

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC sink.
autoshardingbooleanIf true, enables using a dynamically determined number of shards to write.
batch_sizeint64n/a
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to write to.
passwordstrPassword for the JDBC source.
usernamestrUsername for the JDBC source.
write_statementstrSQL query used to insert records into the JDBC sink.

MYSQL Read

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC source.
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
disable_auto_commitbooleanWhether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
fetch_sizeint32This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to read from.
num_partitionsint32The number of partitions
output_parallelizationbooleanWhether to reshuffle the resulting PCollection so results are distributed to all workers.
partition_columnstrName of a column of numeric type that will be used for partitioning.
passwordstrPassword for the JDBC source.
read_querystrSQL query used to query the JDBC source.
usernamestrUsername for the JDBC source.

MYSQL Write

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC sink.
autoshardingbooleanIf true, enables using a dynamically determined number of shards to write.
batch_sizeint64n/a
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to write to.
passwordstrPassword for the JDBC source.
usernamestrUsername for the JDBC source.
write_statementstrSQL query used to insert records into the JDBC sink.

SQLSERVER Read

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC source.
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
disable_auto_commitbooleanWhether to disable auto commit on read. Defaults to true if not provided. The need for this config varies depending on the database platform. Informix requires this to be set to false while Postgres requires this to be set to true.
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
fetch_sizeint32This method is used to override the size of the data that is going to be fetched and loaded in memory per every database call. It should ONLY be used if the default value throws memory errors.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to read from.
num_partitionsint32The number of partitions
output_parallelizationbooleanWhether to reshuffle the resulting PCollection so results are distributed to all workers.
partition_columnstrName of a column of numeric type that will be used for partitioning.
passwordstrPassword for the JDBC source.
read_querystrSQL query used to query the JDBC source.
usernamestrUsername for the JDBC source.

SQLSERVER Write

ConfigurationTypeDescription
jdbc_urlstrConnection URL for the JDBC sink.
autoshardingbooleanIf true, enables using a dynamically determined number of shards to write.
batch_sizeint64n/a
connection_init_sqllist[str]Sets the connection init sql statements used by the Driver. Only MySQL and MariaDB support this.
connection_propertiesstrUsed to set connection properties passed to the JDBC driver not already defined as standalone parameter (e.g. username and password can be set using parameters above accordingly). Format of the string must be "key1=value1;key2=value2;".
driver_class_namestrName of a Java Driver class to use to connect to the JDBC source. For example, "com.mysql.jdbc.Driver".
driver_jarsstrComma separated path(s) for the JDBC driver jar(s). This can be a local path or GCS (gs://) path.
jdbc_typestrType of JDBC source. When specified, an appropriate default Driver will be packaged with the transform. One of mysql, postgres, oracle, or mssql.
locationstrName of the table to write to.
passwordstrPassword for the JDBC source.
usernamestrUsername for the JDBC source.
write_statementstrSQL query used to insert records into the JDBC sink.