apache_beam.transforms.sql module

Package for SqlTransform and related classes.

class apache_beam.transforms.sql.SqlTransform(query, dialect=None, expansion_service=None)[source]

Bases: ExternalTransform

A transform that can translate a SQL query into PTransforms.

Input PCollections must have a schema. Currently, there are two ways to define a schema for a PCollection:

  1. Register a typing.NamedTuple type to use RowCoder, and specify it as the output type. For example:

    Purchase = typing.NamedTuple('Purchase',
                                 [('item_name', unicode), ('price', float)])
    coders.registry.register_coder(Purchase, coders.RowCoder)
    with Pipeline() as p:
      purchases = (p | beam.io...
                     | beam.Map(..).with_output_types(Purchase))
    
  2. Produce beam.Row instances. Note this option will fail if Beam is unable to infer data types for any of the fields. For example:

    with Pipeline() as p:
      purchases = (p | beam.io...
                     | beam.Map(lambda x: beam.Row(item_name=unicode(..),
                                                   price=float(..))))
    

Similarly, the output of SqlTransform is a PCollection with a schema. The columns produced by the query can be accessed as attributes. For example:

purchases | SqlTransform("""
              SELECT item_name, COUNT(*) AS `count`
              FROM PCOLLECTION GROUP BY item_name""")
          | beam.Map(lambda row: "We've sold %d %ss!" % (row.count,
                                                         row.item_name))

Additional examples can be found in apache_beam.examples.wordcount_xlang_sql, apache_beam.examples.sql_taxi, and apache_beam.transforms.sql_test.

For more details about Beam SQL in general, see the Java transform, and the documentation.

Creates a SqlTransform which will be expanded to Java’s SqlTransform. (See class docs). :param query: The SQL query. :param dialect: (optional) The dialect, e.g. use ‘zetasql’ for ZetaSQL. :param expansion_service: (optional) The URL of the expansion service to use

URN = 'beam:external:java:sql:v1'