public class SingleStoreIO
extends java.lang.Object
SingleStoreIO source returns a bounded collection of T
as a PCollection<T>
. T
is the type returned by the provided SingleStoreIO.RowMapper
.
To configure the SingleStoreDB source, you have to provide a SingleStoreIO.DataSourceConfiguration
using SingleStoreIO.DataSourceConfiguration.create(String)
(endpoint). Optionally, SingleStoreIO.DataSourceConfiguration.withUsername(String)
and SingleStoreIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withQuery("select id, name from Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided SingleStoreIO.StatementPreparator
. For
example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306"))
.withQuery("select id,name from Person where name = ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
SingleStoreIO supports partitioned reading of all data from a table. To enable this, use
readWithPartitions()
. This way of data reading is preferred because of the
performance reasons.
The following example shows usage of readWithPartitions()
pipeline.apply(SingleStoreIO.<Row>readWithPartitions()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withTable("Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
SingleStoreIO supports writing records into a database. It writes a PCollection
to the
database by converting data to CSV and sending it to the database with LOAD DATA query.
Like the source, to configure the sink, you have to provide a SingleStoreIO.DataSourceConfiguration
.
{@code pipeline .apply(...) .apply(SingleStoreIO.>write() .withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306") .withUsername("username") .withPassword("password")) .withStatement("insert into Person values(?, ?)") .withUserDataMapper(new UserDataMapper >() {
Modifier and Type | Class and Description |
---|---|
static class |
SingleStoreIO.DataSourceConfiguration
A POJO describing a SingleStoreDB
DataSource by providing all properties needed to
create it. |
static class |
SingleStoreIO.Read<T>
A
PTransform for reading data from SingleStoreDB. |
static class |
SingleStoreIO.ReadWithPartitions<T>
A
PTransform for reading data from SingleStoreDB. |
static interface |
SingleStoreIO.RowMapper<T>
An interface used by
SingleStoreIO.Read and SingleStoreIO.ReadWithPartitions for converting each row of the
ResultSet into an element of the resulting PCollection . |
static interface |
SingleStoreIO.StatementPreparator
An interface used by the SingleStoreIO
SingleStoreIO.Read to set the parameters of the PreparedStatement . |
static interface |
SingleStoreIO.UserDataMapper<T>
An interface used by the SingleStoreIO
SingleStoreIO.Write to map a data from each element of PCollection to a List of Strings. |
static class |
SingleStoreIO.Write<T>
A
PTransform for writing data to SingleStoreDB. |
Constructor and Description |
---|
SingleStoreIO() |
Modifier and Type | Method and Description |
---|---|
static <T> SingleStoreIO.Read<T> |
read()
Read data from a SingleStoreDB datasource.
|
static <T> SingleStoreIO.ReadWithPartitions<T> |
readWithPartitions()
Like
read() , but executes multiple instances of the query on the same table for each
database partition. |
static <T> SingleStoreIO.Write<T> |
write()
Write data to a SingleStoreDB datasource.
|
public static <T> SingleStoreIO.Read<T> read()
T
- Type of the data to be read.public static <T> SingleStoreIO.ReadWithPartitions<T> readWithPartitions()
read()
, but executes multiple instances of the query on the same table for each
database partition.T
- Type of the data to be read.public static <T> SingleStoreIO.Write<T> write()
T
- Type of the data to be written.