Class SqlTransform
- All Implemented Interfaces:
Serializable
,HasDisplayData
SqlTransform
is the DSL interface of Beam SQL. It translates a SQL query as a PTransform
, so developers can use standard SQL queries in a Beam pipeline.
Beam SQL DSL usage:
A typical pipeline with Beam SQL DSL is:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
//create table from TextIO;
PCollection<Row> inputTableA = p.apply(TextIO.read().from("/my/input/patha")).apply(...);
PCollection<Row> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")).apply(...);
//run a simple query, and register the output as a table in BeamSql;
String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
PCollection<Row> outputTableA = inputTableA.apply(
SqlTransform
.query(sql1)
.addUdf("MY_FUNC", MY_FUNC.class, "FUNC");
//run a JOIN with one table from TextIO, and one table from another query
PCollection<Row> outputTableB =
PCollectionTuple
.of(new TupleTag<>("TABLE_O_A"), outputTableA)
.and(new TupleTag<>("TABLE_B"), inputTableB)
.apply(SqlTransform.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
//output the final result with TextIO
outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
p.run().waitUntilFinish();
A typical pipeline with Beam SQL DDL and DSL is:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
String sql1 = "INSERT INTO pubsub_sink SELECT * FROM pubsub_source";
String ddlSource = "CREATE EXTERNAL TABLE pubsub_source(" +
"attributes MAP<VARCHAR, VARCHAR>, payload ROW<name VARCHAR, size INTEGER>)" +
"TYPE pubsub LOCATION 'projects/myproject/topics/topic1'";
String ddlSink = "CREATE EXTERNAL TABLE pubsub_sink(" +
"attributes MAP<VARCHAR, VARCHAR>, payload ROW<name VARCHAR, size INTEGER>)" +
"TYPE pubsub LOCATION 'projects/myproject/topics/mytopic'";
p.apply(SqlTransform.query(sql1).withDdlString(ddlSource).withDdlString(ddlSink))
p.run().waitUntilFinish();
- See Also:
-
Field Summary
FieldsFields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionOverride this method to specify how thisPTransform
should be expanded on the givenInputT
.static SqlTransform
Returns aSqlTransform
representing an equivalent execution plan.registerUdaf
(String functionName, Combine.CombineFn combineFn) register aCombine.CombineFn
as UDAF function used in this query.registerUdf
(String functionName, Class<? extends BeamSqlUdf> clazz) register a UDF function used in this query.registerUdf
(String functionName, SerializableFunction sfn) RegisterSerializableFunction
as a UDF function used in this query.withAutoLoading
(boolean autoLoading) withDdlString
(String ddlString) withDefaultTableProvider
(String name, TableProvider tableProvider) withErrorsTransformer
(PTransform<PCollection<Row>, ? extends POutput> errorsTransformer) withNamedParameters
(Map<String, ?> parameters) withPositionalParameters
(List<?> parameters) withQueryPlannerClass
(Class<? extends QueryPlanner> clazz) withTableProvider
(String name, TableProvider tableProvider) Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Field Details
-
PCOLLECTION_NAME
- See Also:
-
-
Constructor Details
-
SqlTransform
public SqlTransform()
-
-
Method Details
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PInput,
PCollection<Row>>
-
query
Returns aSqlTransform
representing an equivalent execution plan.The
SqlTransform
can be applied to aPCollection
orPCollectionTuple
representing all the input tables.The
PTransform
outputs aPCollection
ofRow
.If the
PTransform
is applied toPCollection
then it gets registered with name PCOLLECTION.If the
PTransform
is applied toPCollectionTuple
thenTupleTag.getId()
is used as the correspondingPCollection
s name.- If the sql query only uses a subset of tables from the upstream
PCollectionTuple
, this is valid; - If the sql query references a table not included in the upstream
PCollectionTuple
, anIllegalStateException
is thrown during query validati on; - Always, tables from the upstream
PCollectionTuple
are only valid in the scope of the current query call.
Any available implementation of
QueryPlanner
can be used as the query planner inSqlTransform
. An implementation can be specified globally for the entire pipeline withBeamSqlPipelineOptions.getPlannerName()
. The global planner can be overridden per-transform withwithQueryPlannerClass(Class)
. - If the sql query only uses a subset of tables from the upstream
-
withTableProvider
-
withDefaultTableProvider
-
withQueryPlannerClass
-
withNamedParameters
-
withPositionalParameters
-
withDdlString
-
withAutoLoading
-
registerUdf
register a UDF function used in this query.Refer to
BeamSqlUdf
for more about how to implement a UDF in BeamSql. -
registerUdf
RegisterSerializableFunction
as a UDF function used in this query. Note,SerializableFunction
must have a constructor without arguments. -
registerUdaf
register aCombine.CombineFn
as UDAF function used in this query. -
withErrorsTransformer
public SqlTransform withErrorsTransformer(PTransform<PCollection<Row>, ? extends POutput> errorsTransformer)
-