apache_beam.transforms.external_transform_provider module

class apache_beam.transforms.external_transform_provider.ExternalTransform(expansion_service=None, **kwargs)[source]

Bases: PTransform

Template for a wrapper class of an external SchemaTransform

This is a superclass for dynamically generated SchemaTransform wrappers and is not meant to be manually instantiated.

Experimental; no backwards compatibility guarantees.

default_expansion_service = None
identifier: str = ''
configuration_schema: Dict[str, ParamInfo] = {}
expand(input)[source]
class apache_beam.transforms.external_transform_provider.ExternalTransformProvider(expansion_services, urn_pattern='^beam:schematransform:org.apache.beam:([\\w-]+):(\\w+)$')[source]

Bases: object

Dynamically discovers Schema-aware external transforms from a given list of expansion services and provides them as ready PTransforms.

A ExternalTransform subclass is generated for each external transform, and is named based on what can be inferred from the URN (see the urn_pattern parameter).

These classes are generated when ExternalTransformProvider is initialized. You can give it an expansion service address that is already up and running:

>>> provider = ExternalTransformProvider("localhost:12345")

Or you can give it the path to an expansion service Jar file:

>>> provider = ExternalTransformProvider(JavaJarExpansionService(
        "path/to/expansion-service.jar"))

Or you can give it the gradle target of a standard Beam expansion service:

>>> provider = ExternalTransformProvider(BeamJarExpansionService(
        "sdks:java:io:google-cloud-platform:expansion-service:shadowJar"))

Note that you can provide a list of these services:

>>> provider = ExternalTransformProvider([
        "localhost:12345",
        JavaJarExpansionService("path/to/expansion-service.jar"),
        BeamJarExpansionService(
          "sdks:java:io:google-cloud-platform:expansion-service:shadowJar")])

The output of get_available() provides a list of available transforms in the provided expansion service(s):

>>> provider.get_available()
[('JdbcWrite', 'beam:schematransform:org.apache.beam:jdbc_write:v1'),
('BigtableRead', 'beam:schematransform:org.apache.beam:bigtable_read:v1'),
...]

You can retrieve a transform with get(), get_urn(), or by directly accessing it as an attribute. The following lines all do the same thing:

>>> provider.get('BigqueryStorageRead')
>>> provider.get_urn(
          'beam:schematransform:org.apache.beam:bigquery_storage_read:v1')
>>> provider.BigqueryStorageRead

You can inspect the transform’s documentation for more details. The following returns the documentation provided by the underlying SchemaTransform. If no such documentation is provided, this will be empty.

>>> import inspect
>>> inspect.getdoc(provider.BigqueryStorageRead)

Similarly, you can inspect the transform’s signature to know more about its parameters, including their names, types, and any documentation that the underlying SchemaTransform may provide:

>>> inspect.signature(provider.BigqueryStorageRead)
(query: 'typing.Union[str, NoneType]: The SQL query to be executed to...',
row_restriction: 'typing.Union[str, NoneType]: Read only rows that match...',
selected_fields: 'typing.Union[typing.Sequence[str], NoneType]: Read ...',
table_spec: 'typing.Union[str, NoneType]: The fully-qualified name of ...')

The retrieved external transform can be used as a normal PTransform like so:

with Pipeline() as p:
  _ = (p
    | 'Read from BigQuery` >> provider.BigqueryStorageRead(
            query=query,
            row_restriction=restriction)
    | 'Some processing' >> beam.Map(...))
get_available() List[Tuple[str, str]][source]

Get a list of available ExternalTransform names and identifiers

get_all() Dict[str, ExternalTransform][source]

Get all ExternalTransforms

get(name) ExternalTransform[source]

Get an ExternalTransform by its inferred class name

get_urn(identifier) ExternalTransform[source]

Get an ExternalTransform by its SchemaTransform identifier