Class SqlTransform

java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PInput,PCollection<Row>>
org.apache.beam.sdk.extensions.sql.SqlTransform
All Implemented Interfaces:
Serializable, HasDisplayData

public abstract class SqlTransform extends PTransform<PInput,PCollection<Row>>
SqlTransform is the DSL interface of Beam SQL. It translates a SQL query as a PTransform, so developers can use standard SQL queries in a Beam pipeline.

Beam SQL DSL usage:

A typical pipeline with Beam SQL DSL is:


 PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline p = Pipeline.create(options);

 //create table from TextIO;
 PCollection<Row> inputTableA = p.apply(TextIO.read().from("/my/input/patha")).apply(...);
 PCollection<Row> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")).apply(...);

 //run a simple query, and register the output as a table in BeamSql;
 String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
 PCollection<Row> outputTableA = inputTableA.apply(
    SqlTransform
        .query(sql1)
        .addUdf("MY_FUNC", MY_FUNC.class, "FUNC");

 //run a JOIN with one table from TextIO, and one table from another query
 PCollection<Row> outputTableB =
     PCollectionTuple
     .of(new TupleTag<>("TABLE_O_A"), outputTableA)
     .and(new TupleTag<>("TABLE_B"), inputTableB)
         .apply(SqlTransform.query("select * from TABLE_O_A JOIN TABLE_B where ..."));

 //output the final result with TextIO
 outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));

 p.run().waitUntilFinish();
 

A typical pipeline with Beam SQL DDL and DSL is:


 PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline p = Pipeline.create(options);

 String sql1 = "INSERT INTO pubsub_sink SELECT * FROM pubsub_source";

 String ddlSource = "CREATE EXTERNAL TABLE pubsub_source(" +
     "attributes MAP<VARCHAR, VARCHAR>, payload ROW<name VARCHAR, size INTEGER>)" +
     "TYPE pubsub LOCATION 'projects/myproject/topics/topic1'";

 String ddlSink = "CREATE EXTERNAL TABLE pubsub_sink(" +
     "attributes MAP<VARCHAR, VARCHAR>, payload ROW<name VARCHAR, size INTEGER>)" +
     "TYPE pubsub LOCATION 'projects/myproject/topics/mytopic'";

 p.apply(SqlTransform.query(sql1).withDdlString(ddlSource).withDdlString(ddlSink))

 p.run().waitUntilFinish();
 
See Also: