Built-in I/O Transforms

SingleStoreDB I/O

Pipeline options and general information about using and running SingleStoreDB I/O.

Before you start

To use SingleStoreDB I/O, add the Maven artifact dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>2.61.0</version>
</dependency>

Additional resources:

Authentication

DataSource configuration is required for configuring SingleStoreIO connection properties.

Create the DataSource configuration:

SingleStoreIO.DataSourceConfiguration
    .create("myHost:3306")
    .withDatabase("db")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("password")
    .withUsername("admin");

Where parameters can be:

Note - .withDatabase(...) is required for .readWithPartitions().

Reading from SingleStoreDB

One of the functions of SingleStoreIO is reading from SingleStoreDB tables. SingleStoreIO supports two types of reading:

In many cases, parallel data reading is preferred over sequential data reading because of performance reasons.

Sequential data reading

The basic .read() operation usage is as follows:

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withStatementPreparator(statementPreparator)
        .withOutputParallelization(true)
        .withRowMapper(mapper)
);

Where parameters can be:

Note - either .withTable(...) or .withQuery(...) is required.

Parallel data reading

The basic .readWithPartitions() operation usage is as follows:

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withRowMapper(mapper)
);

Where parameters can be:

Note - either .withTable(...) or .withQuery(...) is required.

StatementPreparator

The StatementPreparator is used by read() to set the parameters of the PreparedStatement. For example:

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}

RowMapper

The RowMapper is used by read() and readWithPartitions() for converting each row of the ResultSet into an element of the resulting PCollection. For example:

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}

Writing to SingleStoreDB tables

One of the functions of SingleStoreIO is writing to SingleStoreDB tables. This transformation enables you to send the user’s PCollection to your SingleStoreDB database. It returns number of rows written by each batch of elements.

The basic .write() operation usage is as follows:

data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE")
        .withUserDataMapper(mapper)
        .withBatchSize(100000)
);

Where parameters can be:

UserDataMapper

The UserDataMapper is required to map data from a PCollection to an array of String values before the write() operation saves the data. For example:

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}