Class SingleStoreIO
Reading from SingleStoreDB datasource
SingleStoreIO source returns a bounded collection of T
as a PCollection<T>
. T
is the type returned by the provided SingleStoreIO.RowMapper
.
To configure the SingleStoreDB source, you have to provide a SingleStoreIO.DataSourceConfiguration
using SingleStoreIO.DataSourceConfiguration.create(String)
(endpoint). Optionally, SingleStoreIO.DataSourceConfiguration.withUsername(String)
and SingleStoreIO.DataSourceConfiguration.withPassword(String)
allows you to define username and password.
For example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withQuery("select id, name from Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided SingleStoreIO.StatementPreparator
. For
example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306"))
.withQuery("select id,name from Person where name = ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Parallel reading from a SingleStoreDB datasource
SingleStoreIO supports partitioned reading of all data from a table. To enable this, use
readWithPartitions()
. This way of data reading is preferred because of the
performance reasons.
The following example shows usage of readWithPartitions()
pipeline.apply(SingleStoreIO.<Row>readWithPartitions()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withTable("Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Writing to SingleStoreDB datasource
SingleStoreIO supports writing records into a database. It writes a PCollection
to the
database by converting data to CSV and sending it to the database with LOAD DATA query.
Like the source, to configure the sink, you have to provide a SingleStoreIO.DataSourceConfiguration
.
pipeline
.apply(...)
.apply(SingleStoreIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
@Override
public List<String> mapRow(KV<Integer, String> element) {
List<String> res = new ArrayList<>();
res.add(element.getKey().toString());
res.add(element.getValue());
return res;
}
})
);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
A POJO describing a SingleStoreDBDataSource
by providing all properties needed to create it.static class
APTransform
for reading data from SingleStoreDB.static class
APTransform
for reading data from SingleStoreDB.static interface
An interface used bySingleStoreIO.Read
andSingleStoreIO.ReadWithPartitions
for converting each row of theResultSet
into an element of the resultingPCollection
.static interface
A RowMapper that provides a Coder for resulting PCollection.static interface
A RowMapper that requires initialization.static interface
An interface used by the SingleStoreIOSingleStoreIO.Read
to set the parameters of thePreparedStatement
.static interface
An interface used by the SingleStoreIOSingleStoreIO.Write
to map a data from each element ofPCollection
to a List of Strings.static class
APTransform
for writing data to SingleStoreDB. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> SingleStoreIO.Read
<T> read()
Read data from a SingleStoreDB datasource.static SingleStoreIO.Read
<Row> readRows()
Read BeamRow
s from a SingleStoreDB datasource.static <T> SingleStoreIO.ReadWithPartitions
<T> Likeread()
, but executes multiple instances of the query on the same table for each database partition.static SingleStoreIO.ReadWithPartitions
<Row> LikereadRows()
, but executes multiple instances of the query on the same table for each database partition.static <T> SingleStoreIO.Write
<T> write()
Write data to a SingleStoreDB datasource.static SingleStoreIO.Write
<Row> Write BeamRow
s to a SingleStoreDB datasource.
-
Constructor Details
-
SingleStoreIO
public SingleStoreIO()
-
-
Method Details
-
read
Read data from a SingleStoreDB datasource.- Type Parameters:
T
- Type of the data to be read.
-
readRows
Read BeamRow
s from a SingleStoreDB datasource. -
readWithPartitions
Likeread()
, but executes multiple instances of the query on the same table for each database partition.- Type Parameters:
T
- Type of the data to be read.
-
readWithPartitionsRows
LikereadRows()
, but executes multiple instances of the query on the same table for each database partition. -
write
Write data to a SingleStoreDB datasource.- Type Parameters:
T
- Type of the data to be written.
-
writeRows
Write BeamRow
s to a SingleStoreDB datasource.
-