@Experimental(value=SOURCE_SINK) public class JdbcIO extends java.lang.Object
JdbcIO source returns a bounded collection of T
as a PCollection<T>
. T is the
type returned by the provided JdbcIO.RowMapper
.
To configure the JDBC source, you have to provide a JdbcIO.DataSourceConfiguration
using
1. JdbcIO.DataSourceConfiguration.create(DataSource)
(which must be Serializable
);
2. or JdbcIO.DataSourceConfiguration.create(String, String)
(driver class name and url).
Optionally, JdbcIO.DataSourceConfiguration.withUsername(String)
and JdbcIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided JdbcIO.StatementPreparator
. For
example:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password"))
.withQuery("select id,name from Person where name = ?")
.withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
To customize the building of the DataSource
we can provide a SerializableFunction
. For example if you need to provide a PoolingDataSource
from an
existing JdbcIO.DataSourceConfiguration
: you can use a JdbcIO.PoolableDataSourceProvider
:
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
"username", "password")))
// ...
);
By default, the provided function requests a DataSource per execution thread. In some
circumstances this can quickly overwhelm the database by requesting too many connections. In that
case you should look into sharing a single instance of a PoolingDataSource
across all the
execution threads. For example:
private static class MyDataSourceProviderFn implements SerializableFunction<Void, DataSource> {
private static transient DataSource dataSource;
@Override
public synchronized DataSource apply(Void input) {
if (dataSource == null) {
dataSource = ... build data source ...
}
return dataSource;
}
}
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceProviderFn(new MyDataSourceProviderFn())
// ...
);
3. To read all data from a table in parallel with partitioning can be done with JdbcIO.ReadWithPartitions
:
pipeline.apply(JdbcIO.<KV<Integer, String>>readWithPartitions()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withTable("Person")
.withPartitionColumn("id")
.withLowerBound(0)
.withUpperBound(1000)
.withNumPartitions(5)
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Instead of a full table you could also use a subquery in parentheses. The subquery can be specified using Table option instead and partition columns can be qualified using the subquery alias provided as part of Table.
pipeline.apply(JdbcIO.<KV<Integer, String>>readWithPartitions()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withTable("(select id, name from Person) as subq")
.withPartitionColumn("id")
.withLowerBound(0)
.withUpperBound(1000)
.withNumPartitions(5)
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
JDBC sink supports writing records into a database. It writes a PCollection
to the
database by converting each T into a PreparedStatement
via a user-provided JdbcIO.PreparedStatementSetter
.
Like the source, to configure the sink, you have to provide a JdbcIO.DataSourceConfiguration
.
pipeline
.apply(...)
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query)
throws SQLException {
query.setInt(1, element.getKey());
query.setString(2, element.getValue());
}
})
);
NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
times for fault tolerance. Because of that, you should avoid using INSERT
statements,
since that risks duplicating records in the database, or failing due to primary key conflicts.
Consider using MERGE ("upsert")
statements supported by your database instead.
Modifier and Type | Class and Description |
---|---|
static class |
JdbcIO.DataSourceConfiguration
A POJO describing a
DataSource , either providing directly a DataSource or all
properties allowing to create a DataSource . |
static class |
JdbcIO.DataSourceProviderFromDataSourceConfiguration
Wraps a
JdbcIO.DataSourceConfiguration to provide a DataSource . |
static class |
JdbcIO.DefaultRetryStrategy
This is the default
Predicate we use to detect DeadLock. |
static class |
JdbcIO.PoolableDataSourceProvider
Wraps a
JdbcIO.DataSourceConfiguration to provide a PoolingDataSource . |
static interface |
JdbcIO.PreparedStatementSetter<T>
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Read<T>
Implementation of
read() . |
static class |
JdbcIO.ReadAll<ParameterT,OutputT>
Implementation of
readAll() . |
static class |
JdbcIO.ReadRows
Implementation of
readRows() . |
static class |
JdbcIO.ReadWithPartitions<T>
Implementation of
readWithPartitions() . |
static class |
JdbcIO.RetryConfiguration
Builder used to help with retry configuration for
JdbcIO . |
static interface |
JdbcIO.RetryStrategy
An interface used to control if we retry the statements when a
SQLException occurs. |
static interface |
JdbcIO.RowMapper<T>
An interface used by
JdbcIO.Read for converting each row of the ResultSet into
an element of the resulting PCollection . |
static interface |
JdbcIO.StatementPreparator
An interface used by the JdbcIO Write to set the parameters of the
PreparedStatement
used to setParameters into the database. |
static class |
JdbcIO.Write<T>
This class is used as the default return value of
write() . |
static class |
JdbcIO.WriteVoid<T>
A
PTransform to write to a JDBC datasource. |
static class |
JdbcIO.WriteWithResults<T,V extends JdbcWriteResult>
A
PTransform to write to a JDBC datasource. |
Modifier and Type | Method and Description |
---|---|
static <T> JdbcIO.Read<T> |
read()
Read data from a JDBC datasource.
|
static <ParameterT,OutputT> |
readAll()
Like
read() , but executes multiple instances of the query substituting each element of a
PCollection as query parameters. |
static JdbcIO.ReadRows |
readRows()
Read Beam
Row s from a JDBC data source. |
static <T> JdbcIO.ReadWithPartitions<T> |
readWithPartitions()
Like
readAll() , but executes multiple instances of the query on the same table
(subquery) using ranges. |
static <T> JdbcIO.Write<T> |
write()
Write data to a JDBC datasource.
|
static <T> JdbcIO.WriteVoid<T> |
writeVoid() |
public static <T> JdbcIO.Read<T> read()
T
- Type of the data to be read.@Experimental(value=SCHEMAS) public static JdbcIO.ReadRows readRows()
Row
s from a JDBC data source.public static <ParameterT,OutputT> JdbcIO.ReadAll<ParameterT,OutputT> readAll()
read()
, but executes multiple instances of the query substituting each element of a
PCollection
as query parameters.ParameterT
- Type of the data representing query parameters.OutputT
- Type of the data to be read.public static <T> JdbcIO.ReadWithPartitions<T> readWithPartitions()
readAll()
, but executes multiple instances of the query on the same table
(subquery) using ranges.T
- Type of the data to be read.public static <T> JdbcIO.Write<T> write()
T
- Type of the data to be written.public static <T> JdbcIO.WriteVoid<T> writeVoid()