@Experimental(value=SOURCE_SINK) public class JdbcIO extends java.lang.Object
JdbcIO source returns a bounded collection of T
as a PCollection<T>
. T is the
type returned by the provided JdbcIO.RowMapper
.
To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration
using
1. JdbcIO.DataSourceConfiguration.create(DataSource)
(which must be Serializable
);
2. or JdbcIO.DataSourceConfiguration.create(String, String)
(driver class name and url).
Optionally, JdbcIO.DataSourceConfiguration.withUsername(String)
and JdbcIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided JdbcIO.StatementPreparator
. For
example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password"))
.withQuery("select id,name from Person where name = ?")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
To customize the building of the DataSource
we can provide a SerializableFunction
. For example if you need to provide a PoolingDataSource
from an
existing JdbcIO.DataSourceConfiguration
: you can use a JdbcIO.PoolableDataSourceProvider
:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password")))
// ...
);
By default, the provided function requests a DataSource per execution thread. In some
circumstances this can quickly overwhelm the database by requesting too many connections. In that
case you should look into sharing a single instance of a PoolingDataSource
across all the
execution threads. For example:
{@code private static class MyDataSourceProviderFn implements SerializableFunction{ private static transient DataSource dataSource;
Modifier and Type | Class and Description |
---|---|
static class |
JdbcIO.DataSourceConfiguration
A POJO describing a
DataSource , either providing directly a DataSource or all
properties allowing to create a DataSource . |
static class |
JdbcIO.DataSourceProviderFromDataSourceConfiguration
Wraps a
JdbcIO.DataSourceConfiguration to provide a DataSource . |
static class |
JdbcIO.DefaultRetryStrategy
This is the default
Predicate we use to detect DeadLock. |
static class |
JdbcIO.PoolableDataSourceProvider
Wraps a
JdbcIO.DataSourceConfiguration to provide a PoolingDataSource . |
static interface |
JdbcIO.PreparedStatementSetter<T>
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Read<T>
Implementation of
read() . |
static class |
JdbcIO.ReadAll<ParameterT,OutputT>
Implementation of
readAll() . |
static class |
JdbcIO.ReadRows
Implementation of
readRows() . |
static class |
JdbcIO.RetryConfiguration
Builder used to help with retry configuration for
JdbcIO . |
static interface |
JdbcIO.RetryStrategy
An interface used to control if we retry the statements when a
SQLException occurs. |
static interface |
JdbcIO.RowMapper<T>
An interface used by
JdbcIO.Read for converting each row of the ResultSet into
an element of the resulting PCollection . |
static interface |
JdbcIO.StatementPreparator
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Write<T>
This class is used as the default return value of
write() . |
static class |
JdbcIO.WriteVoid<T>
A
PTransform to write to a JDBC datasource. |
Modifier and Type | Method and Description |
---|---|
static <T> JdbcIO.Read<T> |
read()
Read data from a JDBC datasource.
|
static <ParameterT,OutputT> |
readAll()
Like
read() , but executes multiple instances of the query substituting each element of a
PCollection as query parameters. |
static JdbcIO.ReadRows |
readRows()
Read Beam
Row s from a JDBC data source. |
static <T> JdbcIO.Write<T> |
write()
Write data to a JDBC datasource.
|
static <T> JdbcIO.WriteVoid<T> |
writeVoid() |
public static <T> JdbcIO.Read<T> read()
T
- Type of the data to be read.@Experimental(value=SCHEMAS) public static JdbcIO.ReadRows readRows()
Row
s from a JDBC data source.public static <ParameterT,OutputT> JdbcIO.ReadAll<ParameterT,OutputT> readAll()
read()
, but executes multiple instances of the query substituting each element of a
PCollection
as query parameters.ParameterT
- Type of the data representing query parameters.OutputT
- Type of the data to be read.public static <T> JdbcIO.Write<T> write()
T
- Type of the data to be written.public static <T> JdbcIO.WriteVoid<T> writeVoid()