Class SingleStoreIO

java.lang.Object
org.apache.beam.sdk.io.singlestore.SingleStoreIO

public class SingleStoreIO extends Object
IO to read and write data on SingleStoreDB.

Reading from SingleStoreDB datasource

SingleStoreIO source returns a bounded collection of T as a PCollection<T>. T is the type returned by the provided SingleStoreIO.RowMapper.

To configure the SingleStoreDB source, you have to provide a SingleStoreIO.DataSourceConfiguration using SingleStoreIO.DataSourceConfiguration.create(String)(endpoint). Optionally, SingleStoreIO.DataSourceConfiguration.withUsername(String) and SingleStoreIO.DataSourceConfiguration.withPassword(String) allows you to define username and password.

For example:


 pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
   .withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
        .withUsername("username")
        .withPassword("password"))
   .withQuery("select id, name from Person")
   .withRowMapper(new RowMapper<KV<Integer, String>>() {
     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
       return KV.of(resultSet.getInt(1), resultSet.getString(2));
     }
   })
 );
 

Query parameters can be configured using a user-provided SingleStoreIO.StatementPreparator. For example:


 pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
   .withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306"))
   .withQuery("select id,name from Person where name = ?")
   .withStatementPreparator(new StatementPreparator() {
     public void setParameters(PreparedStatement preparedStatement) throws Exception {
       preparedStatement.setString(1, "Darwin");
     }
   })
   .withRowMapper(new RowMapper<KV<Integer, String>>() {
     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
       return KV.of(resultSet.getInt(1), resultSet.getString(2));
     }
   })
 );
 

Parallel reading from a SingleStoreDB datasource

SingleStoreIO supports partitioned reading of all data from a table. To enable this, use readWithPartitions(). This way of data reading is preferred because of the performance reasons.

The following example shows usage of readWithPartitions()


 pipeline.apply(SingleStoreIO.<Row>readWithPartitions()
  .withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
       .withUsername("username")
       .withPassword("password"))
  .withTable("Person")
  .withRowMapper(new RowMapper<KV<Integer, String>>() {
    public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
      return KV.of(resultSet.getInt(1), resultSet.getString(2));
    }
  })
 );
 

Writing to SingleStoreDB datasource

SingleStoreIO supports writing records into a database. It writes a PCollection to the database by converting data to CSV and sending it to the database with LOAD DATA query.

Like the source, to configure the sink, you have to provide a SingleStoreIO.DataSourceConfiguration.


 pipeline
   .apply(...)
   .apply(SingleStoreIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(DataSourceConfiguration.create("hostname:3306")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
        @Override
        public List<String> mapRow(KV<Integer, String> element) {
          List<String> res = new ArrayList<>();
          res.add(element.getKey().toString());
          res.add(element.getValue());
          return res;
        }
      })
    );
 
  • Constructor Details

    • SingleStoreIO

      public SingleStoreIO()
  • Method Details

    • read

      public static <T> SingleStoreIO.Read<T> read()
      Read data from a SingleStoreDB datasource.
      Type Parameters:
      T - Type of the data to be read.
    • readRows

      public static SingleStoreIO.Read<Row> readRows()
      Read Beam Rows from a SingleStoreDB datasource.
    • readWithPartitions

      public static <T> SingleStoreIO.ReadWithPartitions<T> readWithPartitions()
      Like read(), but executes multiple instances of the query on the same table for each database partition.
      Type Parameters:
      T - Type of the data to be read.
    • readWithPartitionsRows

      public static SingleStoreIO.ReadWithPartitions<Row> readWithPartitionsRows()
      Like readRows(), but executes multiple instances of the query on the same table for each database partition.
    • write

      public static <T> SingleStoreIO.Write<T> write()
      Write data to a SingleStoreDB datasource.
      Type Parameters:
      T - Type of the data to be written.
    • writeRows

      public static SingleStoreIO.Write<Row> writeRows()
      Write Beam Rows to a SingleStoreDB datasource.