Class JdbcIO

java.lang.Object
org.apache.beam.sdk.io.jdbc.JdbcIO

public class JdbcIO extends Object
IO to read and write data on JDBC.

Reading from JDBC datasource

JdbcIO source returns a bounded collection of T as a PCollection<T>. T is the type returned by the provided JdbcIO.RowMapper.

To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration using
1. JdbcIO.DataSourceConfiguration.create(DataSource)(which must be Serializable);
2. or JdbcIO.DataSourceConfiguration.create(String, String)(driver class name and url). Optionally, JdbcIO.DataSourceConfiguration.withUsername(String) and JdbcIO.DataSourceConfiguration.withPassword(String) allows you to define username and password.

For example:


 pipeline.apply(JdbcIO.<KV<Integer, String>>read()
   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
          "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
        .withUsername("username")
        .withPassword("password"))
   .withQuery("select id,name from Person")
   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
       return KV.of(resultSet.getInt(1), resultSet.getString(2));
     }
   })
 );
 

Note you should check with your database provider for the JDBC Driver and Connection Url that used to create the DataSourceConfiguration. For example, if you use Cloud SQL with postgres, the JDBC connection Url has this pattern with SocketFactory: "jdbc:postgresql://google/mydb?cloudSqlInstance=project:region:myinstanceinvalid input: '&' socketFactory=com.google.cloud.sql.postgres.SocketFactory". Check here for more details.

Query parameters can be configured using a user-provided JdbcIO.StatementPreparator. For example:


 pipeline.apply(JdbcIO.<KV<Integer, String>>read()
   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
       "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
       "username", "password"))
   .withQuery("select id,name from Person where name = ?")
   .withStatementPreparator(new JdbcIO.StatementPreparator() {
     public void setParameters(PreparedStatement preparedStatement) throws Exception {
       preparedStatement.setString(1, "Darwin");
     }
   })
   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
       return KV.of(resultSet.getInt(1), resultSet.getString(2));
     }
   })
 );
 

To customize the building of the DataSource we can provide a SerializableFunction. For example if you need to provide a PoolingDataSource from an existing JdbcIO.DataSourceConfiguration: you can use a JdbcIO.PoolableDataSourceProvider:


 pipeline.apply(JdbcIO.<KV<Integer, String>>read()
   .withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
       JdbcIO.DataSourceConfiguration.create(
           "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
           "username", "password")))
    // ...
 );
 

By default, the provided function requests a DataSource per execution thread. In some circumstances this can quickly overwhelm the database by requesting too many connections. In that case you should look into sharing a single instance of a PoolingDataSource across all the execution threads. For example:


 private static class MyDataSourceProviderFn implementsSerializableFunction<Void, DataSource> {
   private static transient DataSource dataSource;

  @Override
   public synchronized DataSource apply(Void input) {
     if (dataSource == null) {
       dataSource = ... build data source ...
     }
     return dataSource;
   }
 }
 
 pipeline.apply(JdbcIO.<KV<Integer, String>>read()
   .withDataSourceProviderFn(new MyDataSourceProviderFn())
   // ...
 );
 

Parallel reading from a JDBC datasource

Beam supports partitioned reading of all data from a table. Automatic partitioning is supported for a few data types: Long, DateTime. To enable this, use readWithPartitions(TypeDescriptor). For other types, use readWithPartitions(JdbcReadWithPartitionsHelper) with custom JdbcReadWithPartitionsHelper.

The partitioning scheme depends on these parameters, which can be user-provided, or automatically inferred by Beam (for the supported types):

  • Upper bound
  • Lower bound
  • Number of partitions - when auto-inferred, the number of partitions defaults to the square root of the number of rows divided by 5 (i.e.: Math.floor(Math.sqrt(numRows) / 5)).

To trigger auto-inference of these parameters, the user just needs to not provide them. To infer them automatically, Beam runs either of these statements:

  • SELECT min(column), max(column), COUNT(*) from table when none of the parameters is passed to the transform.
  • SELECT min(column), max(column) from table when only number of partitions is provided, but not upper or lower bounds.

Should I use this transform? Consider using this transform in the following situations:

  • The partitioning column is indexed. This will help speed up the range queries
  • Use auto-inference if the queries for bound and partition inference are efficient to execute in your DBMS.
  • The distribution of data over the partitioning column is roughly uniform. Uniformity is not mandatory, but this transform will work best in that situation.

The following example shows usage of auto-inferred ranges, number of partitions, and schema


 pipeline.apply(JdbcIO.<Row>readWithPartitions()
  .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
         "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
       .withUsername("username")
       .withPassword("password"))
  .withTable("Person")
  .withPartitionColumn("id")
  .withRowOutput()
 );
 

Instead of a full table you could also use a subquery in parentheses. The subquery can be specified using Table option instead and partition columns can be qualified using the subquery alias provided as part of Table. Note that a subquery may not perform as well with auto-inferred ranges and partitions, because it may not rely on indices to speed up the partitioning.


 pipeline.apply(JdbcIO.<KV<Integer, String>>readWithPartitions()
  .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
         "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
       .withUsername("username")
       .withPassword("password"))
  .withTable("(select id, name from Person) as subq")
  .withPartitionColumn("id")
  .withLowerBound(0)
  .withUpperBound(1000)
  .withNumPartitions(5)
  .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
    public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
      return KV.of(resultSet.getInt(1), resultSet.getString(2));
    }
  })
 );
 

Writing to JDBC datasource

JDBC sink supports writing records into a database. It writes a PCollection to the database by converting each T into a PreparedStatement via a user-provided JdbcIO.PreparedStatementSetter.

Like the source, to configure the sink, you have to provide a JdbcIO.DataSourceConfiguration.


 pipeline
   .apply(...)
   .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query)
          throws SQLException {
          query.setInt(1, element.getKey());
          query.setString(2, element.getValue());
        }
      })
    );
 

NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple times for fault tolerance. Because of that, you should avoid using INSERT statements, since that risks duplicating records in the database, or failing due to primary key conflicts. Consider using MERGE ("upsert") statements supported by your database instead.

  • Method Details

    • read

      public static <T> JdbcIO.Read<T> read()
      Read data from a JDBC datasource.
      Type Parameters:
      T - Type of the data to be read.
    • readRows

      public static JdbcIO.ReadRows readRows()
      Read Beam Rows from a JDBC data source.
    • readAll

      public static <ParameterT, OutputT> JdbcIO.ReadAll<ParameterT,OutputT> readAll()
      Like read(), but executes multiple instances of the query substituting each element of a PCollection as query parameters.
      Type Parameters:
      ParameterT - Type of the data representing query parameters.
      OutputT - Type of the data to be read.
    • readWithPartitions

      public static <T, PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(TypeDescriptor<PartitionColumnT> partitioningColumnType)
      Like readAll(), but executes multiple instances of the query on the same table (subquery) using ranges.
      Type Parameters:
      T - Type of the data to be read.
      Parameters:
      partitioningColumnType - Type descriptor for the partition column.
    • readWithPartitions

      public static <T, PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper)
      Like readAll(), but executes multiple instances of the query on the same table (subquery) using ranges.
      Type Parameters:
      T - Type of the data to be read.
      Parameters:
      partitionsHelper - Custom helper for defining partitions.
    • readWithPartitions

      public static <T> JdbcIO.ReadWithPartitions<T,Long> readWithPartitions()
    • write

      public static <T> JdbcIO.Write<T> write()
      Write data to a JDBC datasource.
      Type Parameters:
      T - Type of the data to be written.
    • writeVoid

      public static <T> JdbcIO.WriteVoid<T> writeVoid()