@Experimental(value=SOURCE_SINK) public class JdbcIO extends java.lang.Object
JdbcIO source returns a bounded collection of T
as a PCollection<T>
. T is the
type returned by the provided JdbcIO.RowMapper
.
To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration
using
1. JdbcIO.DataSourceConfiguration.create(DataSource)
(which must be Serializable
);
2. or JdbcIO.DataSourceConfiguration.create(String, String)
(driver class name and url).
Optionally, JdbcIO.DataSourceConfiguration.withUsername(String)
and JdbcIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided JdbcIO.StatementPreparator
. For
example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password"))
.withQuery("select id,name from Person where name = ?")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
To customize the building of the DataSource
we can provide a SerializableFunction
. For example if you need to provide a PoolingDataSource
from an
existing JdbcIO.DataSourceConfiguration
: you can use a JdbcIO.PoolableDataSourceProvider
:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password")))
// ...
);
By default, the provided function instantiates a DataSource per execution thread. In some
circumstances, such as DataSources that have a pool of connections, this can quickly overwhelm
the database by requesting too many connections. In that case you should make the DataSource a
static singleton so it gets instantiated only once per JVM.
JDBC sink supports writing records into a database. It writes a PCollection
to the
database by converting each T into a PreparedStatement
via a user-provided JdbcIO.PreparedStatementSetter
.
Like the source, to configure the sink, you have to provide a JdbcIO.DataSourceConfiguration
.
pipeline
.apply(...)
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query)
throws SQLException {
query.setInt(1, element.getKey());
query.setString(2, element.getValue());
}
})
);
NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
times for fault tolerance. Because of that, you should avoid using INSERT
statements,
since that risks duplicating records in the database, or failing due to primary key conflicts.
Consider using MERGE ("upsert")
statements supported by your database instead.
Modifier and Type | Class and Description |
---|---|
static class |
JdbcIO.DataSourceConfiguration
A POJO describing a
DataSource , either providing directly a DataSource or all
properties allowing to create a DataSource . |
static class |
JdbcIO.DefaultRetryStrategy
This is the default
Predicate we use to detect DeadLock. |
static class |
JdbcIO.PoolableDataSourceProvider
Wraps a
JdbcIO.DataSourceConfiguration to provide a PoolingDataSource . |
static interface |
JdbcIO.PreparedStatementSetter<T>
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Read<T>
Implementation of
read() . |
static class |
JdbcIO.ReadAll<ParameterT,OutputT>
Implementation of
readAll() . |
static interface |
JdbcIO.RetryStrategy
An interface used to control if we retry the statements when a
SQLException occurs. |
static interface |
JdbcIO.RowMapper<T>
An interface used by
JdbcIO.Read for converting each row of the ResultSet into
an element of the resulting PCollection . |
static interface |
JdbcIO.StatementPreparator
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Write<T>
This class is used as the default return value of
write() . |
static class |
JdbcIO.WriteVoid<T>
A
PTransform to write to a JDBC datasource. |
Modifier and Type | Method and Description |
---|---|
static <T> JdbcIO.Read<T> |
read()
Read data from a JDBC datasource.
|
static <ParameterT,OutputT> |
readAll()
Like
read() , but executes multiple instances of the query substituting each element of a
PCollection as query parameters. |
static <T> JdbcIO.Write<T> |
write()
Write data to a JDBC datasource.
|
static <T> JdbcIO.WriteVoid<T> |
writeVoid() |
public static <T> JdbcIO.Read<T> read()
T
- Type of the data to be read.public static <ParameterT,OutputT> JdbcIO.ReadAll<ParameterT,OutputT> readAll()
read()
, but executes multiple instances of the query substituting each element of a
PCollection
as query parameters.ParameterT
- Type of the data representing query parameters.OutputT
- Type of the data to be read.public static <T> JdbcIO.Write<T> write()
T
- Type of the data to be written.public static <T> JdbcIO.WriteVoid<T> writeVoid()