Class JdbcIO
Reading from JDBC datasource
JdbcIO source returns a bounded collection of T as a PCollection<T>. T is the
type returned by the provided JdbcIO.RowMapper.
To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration using
1. JdbcIO.DataSourceConfiguration.create(DataSource)(which must be Serializable);
2. or JdbcIO.DataSourceConfiguration.create(String, String)(driver class name and url).
Optionally, JdbcIO.DataSourceConfiguration.withUsername(String) and JdbcIO.DataSourceConfiguration.withPassword(String) allows you to define username and password.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Note you should check with your database provider for the JDBC Driver and Connection Url that used to create the DataSourceConfiguration. For example, if you use Cloud SQL with postgres, the JDBC connection Url has this pattern with SocketFactory: "jdbc:postgresql://google/mydb?cloudSqlInstance=project:region:myinstanceinvalid input: '&' socketFactory=com.google.cloud.sql.postgres.SocketFactory". Check here for more details.
Query parameters can be configured using a user-provided JdbcIO.StatementPreparator. For
example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password"))
.withQuery("select id,name from Person where name = ?")
.withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
To customize the building of the DataSource we can provide a SerializableFunction. For example if you need to provide a PoolingDataSource from an
existing JdbcIO.DataSourceConfiguration: you can use a JdbcIO.PoolableDataSourceProvider:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password")))
// ...
);
By default, the provided function requests a DataSource per execution thread. In some
circumstances this can quickly overwhelm the database by requesting too many connections. In that
case you should look into sharing a single instance of a PoolingDataSource across all the
execution threads. For example:
private static class MyDataSourceProviderFn implementsSerializableFunction<Void, DataSource> {
private static transient DataSource dataSource;
@Override
public synchronized DataSource apply(Void input) {
if (dataSource == null) {
dataSource = ... build data source ...
}
return dataSource;
}
}
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(new MyDataSourceProviderFn())
// ...
);
Parallel reading from a JDBC datasource
Beam supports partitioned reading of all data from a table. Automatic partitioning is
supported for a few data types: Long, DateTime. To enable this, use
readWithPartitions(TypeDescriptor). For other types, use readWithPartitions(JdbcReadWithPartitionsHelper) with custom JdbcReadWithPartitionsHelper.
The partitioning scheme depends on these parameters, which can be user-provided, or automatically inferred by Beam (for the supported types):
- Upper bound
- Lower bound
- Number of partitions - when auto-inferred, the number of partitions defaults to the square
root of the number of rows divided by 5 (i.e.:
Math.floor(Math.sqrt(numRows) / 5)).
To trigger auto-inference of these parameters, the user just needs to not provide them. To infer them automatically, Beam runs either of these statements:
SELECT min(column), max(column), COUNT(*) from tablewhen none of the parameters is passed to the transform.SELECT min(column), max(column) from tablewhen only number of partitions is provided, but not upper or lower bounds.
Should I use this transform? Consider using this transform in the following situations:
- The partitioning column is indexed. This will help speed up the range queries
- Use auto-inference if the queries for bound and partition inference are efficient to execute in your DBMS.
- The distribution of data over the partitioning column is roughly uniform. Uniformity is not mandatory, but this transform will work best in that situation.
The following example shows usage of auto-inferred ranges, number of partitions, and schema
pipeline.apply(JdbcIO.<Row>readWithPartitions()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withTable("Person")
.withPartitionColumn("id")
.withRowOutput()
);
Instead of a full table you could also use a subquery in parentheses. The subquery can be specified using Table option instead and partition columns can be qualified using the subquery alias provided as part of Table. Note that a subquery may not perform as well with auto-inferred ranges and partitions, because it may not rely on indices to speed up the partitioning.
pipeline.apply(JdbcIO.<KV<Integer, String>>readWithPartitions()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withTable("(select id, name from Person) as subq")
.withPartitionColumn("id")
.withLowerBound(0)
.withUpperBound(1000)
.withNumPartitions(5)
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Writing to JDBC datasource
JDBC sink supports writing records into a database. It writes a PCollection to the
database by converting each T into a PreparedStatement via a user-provided JdbcIO.PreparedStatementSetter.
Like the source, to configure the sink, you have to provide a JdbcIO.DataSourceConfiguration.
pipeline
.apply(...)
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query)
throws SQLException {
query.setInt(1, element.getKey());
query.setString(2, element.getValue());
}
})
);
NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
times for fault tolerance. Because of that, you should avoid using INSERT statements,
since that risks duplicating records in the database, or failing due to primary key conflicts.
Consider using MERGE ("upsert")
statements supported by your database instead.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classA POJO describing aDataSource, either providing directly aDataSourceor all properties allowing to create aDataSource.static classWraps aJdbcIO.DataSourceConfigurationto provide aDataSource.static classThis is the defaultPredicatewe use to detect DeadLock.static classWraps aJdbcIO.DataSourceConfigurationto provide aPoolingDataSource.static interfaceAn interface used by the JdbcIOJdbcIO.ReadAllandJdbcIO.Writeto set the parameters of thePreparedStatementused to setParameters into the database.static classImplementation ofread().static classImplementation ofreadAll().static classImplementation ofreadRows().static classstatic classBuilder used to help with retry configuration forJdbcIO.static interfaceAn interface used to control if we retry the statements when aSQLExceptionoccurs.static interfaceAn interface used byJdbcIO.Readfor converting each row of theResultSetinto an element of the resultingPCollection.static interfaceAn interface used by the JdbcIO Write to set the parameters of thePreparedStatementused to setParameters into the database.static classThis class is used as the default return value ofwrite().static classAPTransformto write to a JDBC datasource.static classJdbcIO.WriteWithResults<T,V extends JdbcWriteResult> APTransformto write to a JDBC datasource. -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> JdbcIO.Read<T> read()Read data from a JDBC datasource.static <ParameterT,OutputT>
JdbcIO.ReadAll<ParameterT, OutputT> readAll()Likeread(), but executes multiple instances of the query substituting each element of aPCollectionas query parameters.static JdbcIO.ReadRowsreadRows()Read BeamRows from a JDBC data source.static <T> JdbcIO.ReadWithPartitions<T, Long> static <T,PartitionColumnT>
JdbcIO.ReadWithPartitions<T, PartitionColumnT> readWithPartitions(JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper) LikereadAll(), but executes multiple instances of the query on the same table (subquery) using ranges.static <T,PartitionColumnT>
JdbcIO.ReadWithPartitions<T, PartitionColumnT> readWithPartitions(TypeDescriptor<PartitionColumnT> partitioningColumnType) LikereadAll(), but executes multiple instances of the query on the same table (subquery) using ranges.static <T> JdbcIO.Write<T> write()Write data to a JDBC datasource.static <T> JdbcIO.WriteVoid<T>
-
Method Details
-
read
Read data from a JDBC datasource.- Type Parameters:
T- Type of the data to be read.
-
readRows
Read BeamRows from a JDBC data source. -
readAll
Likeread(), but executes multiple instances of the query substituting each element of aPCollectionas query parameters.The substitution is configured via
JdbcIO.ReadAll.withParameterSetter(org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter<ParameterT>). Substitutions allowed by the JDBC API'sPreparedStatementare supported. In particular, this does not support parameterizing the table name to read from a different table for each input element.- Type Parameters:
ParameterT- Type of the data representing query parameters.OutputT- Type of the data to be read.
-
readWithPartitions
public static <T,PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(TypeDescriptor<PartitionColumnT> partitioningColumnType) LikereadAll(), but executes multiple instances of the query on the same table (subquery) using ranges.- Type Parameters:
T- Type of the data to be read.- Parameters:
partitioningColumnType- Type descriptor for the partition column.
-
readWithPartitions
public static <T,PartitionColumnT> JdbcIO.ReadWithPartitions<T,PartitionColumnT> readWithPartitions(JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper) LikereadAll(), but executes multiple instances of the query on the same table (subquery) using ranges.- Type Parameters:
T- Type of the data to be read.- Parameters:
partitionsHelper- Custom helper for defining partitions.
-
readWithPartitions
-
write
Write data to a JDBC datasource.- Type Parameters:
T- Type of the data to be written.
-
writeVoid
-