@Experimental(value=SOURCE_SINK) public class JdbcIO extends java.lang.Object
JdbcIO source returns a bounded collection of T
as a PCollection<T>
. T is the
type returned by the provided JdbcIO.RowMapper
.
To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration
using
1. JdbcIO.DataSourceConfiguration.create(DataSource)
(which must be Serializable
);
2. or JdbcIO.DataSourceConfiguration.create(String, String)
(driver class name and url).
Optionally, JdbcIO.DataSourceConfiguration.withUsername(String)
and
JdbcIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided JdbcIO.StatementPreparator
.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password"))
.withQuery("select id,name from Person where name = ?")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
JDBC sink supports writing records into a database. It writes a PCollection
to the
database by converting each T into a PreparedStatement
via a user-provided JdbcIO.PreparedStatementSetter
.
Like the source, to configure the sink, you have to provide a JdbcIO.DataSourceConfiguration
.
pipeline
.apply(...)
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query)
throws SQLException {
query.setInt(1, kv.getKey());
query.setString(2, kv.getValue());
}
})
);
NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
times for fault tolerance. Because of that, you should avoid using INSERT
statements,
since that risks duplicating records in the database, or failing due to primary key conflicts.
Consider using MERGE ("upsert")
statements supported by your database instead.
Modifier and Type | Class and Description |
---|---|
static class |
JdbcIO.DataSourceConfiguration
A POJO describing a
DataSource , either providing directly a DataSource or all
properties allowing to create a DataSource . |
static interface |
JdbcIO.PreparedStatementSetter<T>
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Read<T>
A
PTransform to read data from a JDBC datasource. |
static interface |
JdbcIO.RowMapper<T>
An interface used by
JdbcIO.Read for converting each row of the ResultSet into
an element of the resulting PCollection . |
static interface |
JdbcIO.StatementPreparator
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Write<T>
A
PTransform to write to a JDBC datasource. |
Modifier and Type | Method and Description |
---|---|
static <T> JdbcIO.Read<T> |
read()
Read data from a JDBC datasource.
|
static <T> JdbcIO.Write<T> |
write()
Write data to a JDBC datasource.
|
public static <T> JdbcIO.Read<T> read()
T
- Type of the data to be read.public static <T> JdbcIO.Write<T> write()
T
- Type of the data to be written.