apache_beam.yaml.yaml_provider module

This module defines Providers usable from yaml, which is a specification for where to find and how to invoke services that vend implementations of various PTransforms.

class apache_beam.yaml.yaml_provider.Provider[source]

Bases: object

Maps transform types names and args to concrete PTransform instances.

available() → bool[source]

Returns whether this provider is available to use in this environment.

cache_artifacts() → Optional[Iterable[str]][source]
provided_transforms() → Iterable[str][source]

Returns a list of transform type names this provider can handle.

config_schema(type)[source]
description(type)[source]
requires_inputs(typ: str, args: Mapping[str, Any]) → bool[source]

Returns whether this transform requires inputs.

Specifically, if this returns True and inputs are not provided than an error will be thrown.

This is best-effort, primarily for better and earlier error messages.

create_transform(typ: str, args: Mapping[str, Any], yaml_create_transform: Callable[[Mapping[str, Any], Iterable[apache_beam.pvalue.PCollection]], apache_beam.transforms.ptransform.PTransform]) → apache_beam.transforms.ptransform.PTransform[source]

Creates a PTransform instance for the given transform type and arguments.

underlying_provider()[source]

If this provider is simply a proxy to another provider, return the provider that should actually be used for affinity checking.

affinity(other: apache_beam.yaml.yaml_provider.Provider)[source]

Returns a value approximating how good it would be for this provider to be used immediately following a transform from the other provider (e.g. to encourage fusion).

apache_beam.yaml.yaml_provider.as_provider(name, provider_or_constructor)[source]
apache_beam.yaml.yaml_provider.as_provider_list(name, lst)[source]
class apache_beam.yaml.yaml_provider.ExternalProvider(urns, service)[source]

Bases: apache_beam.yaml.yaml_provider.Provider

A Provider implemented via the cross language transform service.

provided_transforms()[source]
schema_transforms()[source]
config_schema(type)[source]
description(type)[source]
requires_inputs(typ, args)[source]
create_transform(type, args, yaml_create_transform)[source]
create_external_transform(urn, args)[source]
classmethod provider_from_spec(spec)[source]
classmethod register_provider_type(type_name)[source]
apache_beam.yaml.yaml_provider.java_jar(urns, jar: str)[source]
apache_beam.yaml.yaml_provider.maven_jar(urns, *, artifact_id, group_id, version, repository='https://repo.maven.apache.org/maven2', classifier=None, appendix=None)[source]
apache_beam.yaml.yaml_provider.beam_jar(urns, *, gradle_target, appendix=None, version='2.55.0', artifact_id=None)[source]
apache_beam.yaml.yaml_provider.docker(urns, **config)[source]
class apache_beam.yaml.yaml_provider.RemoteProvider(urns, address: str)[source]

Bases: apache_beam.yaml.yaml_provider.ExternalProvider

available()[source]
cache_artifacts()[source]
class apache_beam.yaml.yaml_provider.ExternalJavaProvider(urns, jar_provider)[source]

Bases: apache_beam.yaml.yaml_provider.ExternalProvider

available()[source]
cache_artifacts()[source]
apache_beam.yaml.yaml_provider.python(urns, packages=())[source]
class apache_beam.yaml.yaml_provider.ExternalPythonProvider(urns, packages)[source]

Bases: apache_beam.yaml.yaml_provider.ExternalProvider

available()[source]
cache_artifacts()[source]
create_external_transform(urn, args)[source]
apache_beam.yaml.yaml_provider.fix_pycallable()[source]
class apache_beam.yaml.yaml_provider.InlineProvider(transform_factories, no_input_transforms=())[source]

Bases: apache_beam.yaml.yaml_provider.Provider

available()[source]
cache_artifacts()[source]
provided_transforms()[source]
config_schema(typ)[source]
description(typ)[source]
get_docs(typ)[source]
create_transform(type, args, yaml_create_transform)[source]
to_json()[source]
requires_inputs(typ, args)[source]
class apache_beam.yaml.yaml_provider.MetaInlineProvider(transform_factories, no_input_transforms=())[source]

Bases: apache_beam.yaml.yaml_provider.InlineProvider

create_transform(type, args, yaml_create_transform)[source]
class apache_beam.yaml.yaml_provider.SqlBackedProvider(transforms: Mapping[str, Callable[[...], apache_beam.transforms.ptransform.PTransform]], sql_provider: Optional[apache_beam.yaml.yaml_provider.Provider] = None)[source]

Bases: apache_beam.yaml.yaml_provider.Provider

sql_provider()[source]
provided_transforms()[source]
available()[source]
cache_artifacts()[source]
underlying_provider()[source]
to_json()[source]
create_transform(typ: str, args: Mapping[str, Any], yaml_create_transform: Any) → apache_beam.transforms.ptransform.PTransform[source]
apache_beam.yaml.yaml_provider.element_to_rows(e)[source]
apache_beam.yaml.yaml_provider.dicts_to_rows(o)[source]
class apache_beam.yaml.yaml_provider.YamlProviders[source]

Bases: object

static create(elements: Iterable[Any], reshuffle: Optional[bool] = True)[source]

Creates a collection containing a specified set of elements.

This transform always produces schema’d data. For example:

type: Create
config:
  elements: [1, 2, 3]

will result in an output with three elements with a schema of Row(element=int) whereas YAML/JSON-style mappings will be interpreted directly as Beam rows, e.g.:

type: Create
config:
  elements:
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

will result in a schema of the form (int, Row(string, List[int])).

This can also be expressed as YAML:

type: Create
config:
  elements:
    - first: 0
      second:
        str: "foo"
         values: [1, 2, 3]
    - first: 1
      second:
        str: "bar"
         values: [4, 5, 6]
Parameters:
  • elements – The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows. Primitives will be mapped to rows with a single “element” field.
  • reshuffle – (optional) Whether to introduce a reshuffle (to possibly redistribute the work) if there is more than one element in the collection. Defaults to True.
static fully_qualified_named_transform(constructor: str, args: Optional[Iterable[Any]] = (), kwargs: Optional[Mapping[str, Any]] = {})[source]

A Python PTransform identified by fully qualified name.

This allows one to import, construct, and apply any Beam Python transform. This can be useful for using transforms that have not yet been exposed via a YAML interface. Note, however, that conversion may be required if this transform does not accept or produce Beam Rows.

For example:

type: PyTransform
config:
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
   kwargs:
     baz: 3

can be used to access the transform apache_beam.pkg.mod.SomeClass(1, ‘foo’, baz=3).

Parameters:
  • constructor – Fully qualified name of a callable used to construct the transform. Often this is a class such as apache_beam.pkg.mod.SomeClass but it can also be a function or any other callable that returns a PTransform.
  • args – A list of parameters to pass to the callable as positional arguments.
  • kwargs – A list of parameters to pass to the callable as keyword arguments.
class Flatten[source]

Bases: apache_beam.transforms.ptransform.PTransform

Flattens multiple PCollections into a single PCollection.

The elements of the resulting PCollection will be the (disjoint) union of all the elements of all the inputs.

Note that in YAML transforms can always take a list of inputs which will be implicitly flattened.

expand(pcolls)[source]
class WindowInto(windowing)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A window transform assigning windows to each element of a PCollection.

The assigned windows will affect all downstream aggregating operations, which will aggregate by window as well as by key.

See [the Beam documentation on windowing](https://beam.apache.org/documentation/programming-guide/#windowing) for more details.

Sizes, offsets, periods and gaps (where applicable) must be defined using a time unit suffix ‘ms’, ‘s’, ‘m’, ‘h’ or ‘d’ for milliseconds, seconds, minutes, hours or days, respectively. If a time unit is not specified, it will default to ‘s’.

For example:

windowing:
   type: fixed
   size: 30s

Note that any Yaml transform can have a [windowing parameter](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md#windowing), which is applied to its inputs (if any) or outputs (if there are no inputs) which means that explicit WindowInto operations are not typically needed.

Parameters:windowing – the type and parameters of the windowing to perform
expand(pcoll)[source]
static log_for_testing(level: Optional[str] = 'INFO', prefix: Optional[str] = '')[source]

Logs each element of its input PCollection.

The output of this transform is a copy of its input for ease of use in chain-style pipelines.

Parameters:
  • level – one of ERROR, INFO, or DEBUG, mapped to a corresponding language-specific logging level
  • prefix – an optional identifier that will get prepended to the element being logged
static create_builtin_provider()[source]
class apache_beam.yaml.yaml_provider.TranslatingProvider(transforms: Mapping[str, Callable[[...], apache_beam.transforms.ptransform.PTransform]], underlying_provider: apache_beam.yaml.yaml_provider.Provider)[source]

Bases: apache_beam.yaml.yaml_provider.Provider

provided_transforms()[source]
available()[source]
cache_artifacts()[source]
underlying_provider()[source]
to_json()[source]
create_transform(typ: str, config: Mapping[str, Any], yaml_create_transform: Any) → apache_beam.transforms.ptransform.PTransform[source]
apache_beam.yaml.yaml_provider.create_java_builtin_provider()[source]

Exposes built-in transforms from Java as well as Python to maximize opportunities for fusion.

This class holds those transforms that require pre-processing of the configs. For those Java transforms that can consume the user-provided configs directly (or only need a simple renaming of parameters) a direct or renaming provider is the simpler choice.

class apache_beam.yaml.yaml_provider.PypiExpansionService(packages, base_python='/home/runner/work/beam/beam/beam/sdks/python/target/.tox/py38-docs/bin/python')[source]

Bases: object

Expands transforms by fully qualified name in a virtual environment with the given dependencies.

VENV_CACHE = '/home/runner/.apache_beam/cache/venvs'
class apache_beam.yaml.yaml_provider.RenamingProvider(transforms, mappings, underlying_provider, defaults=None)[source]

Bases: apache_beam.yaml.yaml_provider.Provider

static expand_mappings(mappings)[source]
available() → bool[source]
provided_transforms() → Iterable[str][source]
config_schema(type)[source]
description(typ)[source]
requires_inputs(typ, args)[source]
create_transform(typ: str, args: Mapping[str, Any], yaml_create_transform: Callable[[Mapping[str, Any], Iterable[apache_beam.pvalue.PCollection]], apache_beam.transforms.ptransform.PTransform]) → apache_beam.transforms.ptransform.PTransform[source]

Creates a PTransform instance for the given transform type and arguments.

underlying_provider()[source]
cache_artifacts()[source]
apache_beam.yaml.yaml_provider.parse_providers(provider_specs)[source]
apache_beam.yaml.yaml_provider.merge_providers(*provider_sets)[source]
apache_beam.yaml.yaml_provider.standard_providers()[source]