Class SingleStoreIO
Reading from SingleStoreDB datasource
SingleStoreIO source returns a bounded collection of T as a PCollection<T>. T
is the type returned by the provided SingleStoreIO.RowMapper.
To configure the SingleStoreDB source, you have to provide a SingleStoreIO.DataSourceConfiguration
using SingleStoreIO.DataSourceConfiguration.create(String)(endpoint). Optionally, SingleStoreIO.DataSourceConfiguration.withUsername(String) and SingleStoreIO.DataSourceConfiguration.withPassword(String) allows you to define username and password.
For example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withQuery("select id, name from Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Query parameters can be configured using a user-provided SingleStoreIO.StatementPreparator. For
example:
pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306"))
.withQuery("select id,name from Person where name = ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "Darwin");
}
})
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Parallel reading from a SingleStoreDB datasource
SingleStoreIO supports partitioned reading of all data from a table. To enable this, use
readWithPartitions(). This way of data reading is preferred because of the
performance reasons.
The following example shows usage of readWithPartitions()
pipeline.apply(SingleStoreIO.<Row>readWithPartitions()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withTable("Person")
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Writing to SingleStoreDB datasource
SingleStoreIO supports writing records into a database. It writes a PCollection to the
database by converting data to CSV and sending it to the database with LOAD DATA query.
Like the source, to configure the sink, you have to provide a SingleStoreIO.DataSourceConfiguration.
pipeline
.apply(...)
.apply(SingleStoreIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
@Override
public List<String> mapRow(KV<Integer, String> element) {
List<String> res = new ArrayList<>();
res.add(element.getKey().toString());
res.add(element.getValue());
return res;
}
})
);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classA POJO describing a SingleStoreDBDataSourceby providing all properties needed to create it.static classAPTransformfor reading data from SingleStoreDB.static classAPTransformfor reading data from SingleStoreDB.static interfaceAn interface used bySingleStoreIO.ReadandSingleStoreIO.ReadWithPartitionsfor converting each row of theResultSetinto an element of the resultingPCollection.static interfaceA RowMapper that provides a Coder for resulting PCollection.static interfaceA RowMapper that requires initialization.static interfaceAn interface used by the SingleStoreIOSingleStoreIO.Readto set the parameters of thePreparedStatement.static interfaceAn interface used by the SingleStoreIOSingleStoreIO.Writeto map a data from each element ofPCollectionto a List of Strings.static classAPTransformfor writing data to SingleStoreDB. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> SingleStoreIO.Read<T> read()Read data from a SingleStoreDB datasource.static SingleStoreIO.Read<Row> readRows()Read BeamRows from a SingleStoreDB datasource.static <T> SingleStoreIO.ReadWithPartitions<T> Likeread(), but executes multiple instances of the query on the same table for each database partition.static SingleStoreIO.ReadWithPartitions<Row> LikereadRows(), but executes multiple instances of the query on the same table for each database partition.static <T> SingleStoreIO.Write<T> write()Write data to a SingleStoreDB datasource.static SingleStoreIO.Write<Row> Write BeamRows to a SingleStoreDB datasource.
-
Constructor Details
-
SingleStoreIO
public SingleStoreIO()
-
-
Method Details
-
read
Read data from a SingleStoreDB datasource.- Type Parameters:
T- Type of the data to be read.
-
readRows
Read BeamRows from a SingleStoreDB datasource. -
readWithPartitions
Likeread(), but executes multiple instances of the query on the same table for each database partition.- Type Parameters:
T- Type of the data to be read.
-
readWithPartitionsRows
LikereadRows(), but executes multiple instances of the query on the same table for each database partition. -
write
Write data to a SingleStoreDB datasource.- Type Parameters:
T- Type of the data to be written.
-
writeRows
Write BeamRows to a SingleStoreDB datasource.
-