Source code for apache_beam.runners.dataflow.ptransform_overrides
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Ptransform overrides for DataflowRunner."""
# pytype: skip-file
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.pipeline import PTransformOverride
[docs]class CreatePTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``Create`` in streaming mode."""
[docs] def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import Create
from apache_beam.runners.dataflow.internal import apiclient
if isinstance(applied_ptransform.transform, Create):
return not apiclient._use_fnapi(
applied_ptransform.outputs[None].pipeline._options)
else:
return False
[docs] def get_replacement_transform_for_applied_ptransform(
self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import PTransform
ptransform = applied_ptransform.transform
# Return a wrapper rather than ptransform.as_read() directly to
# ensure backwards compatibility of the pipeline structure.
class LegacyCreate(PTransform):
def expand(self, pbegin):
return pbegin | ptransform.as_read()
return LegacyCreate().with_output_types(ptransform.get_output_type())
[docs]class ReadPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``Read(BoundedSource)``"""
[docs] def matches(self, applied_ptransform):
from apache_beam.io import Read
from apache_beam.io.iobase import BoundedSource
# Only overrides Read(BoundedSource) transform
if (isinstance(applied_ptransform.transform, Read) and
not getattr(applied_ptransform.transform, 'override', False)):
if isinstance(applied_ptransform.transform.source, BoundedSource):
return True
return False
[docs] def get_replacement_transform_for_applied_ptransform(
self, applied_ptransform):
from apache_beam import pvalue
from apache_beam.io import iobase
transform = applied_ptransform.transform
class Read(iobase.Read):
override = True
def expand(self, pbegin):
return pvalue.PCollection(
self.pipeline, is_bounded=self.source.is_bounded())
return Read(transform.source).with_output_types(
transform.get_type_hints().simple_output_type('Read'))
[docs]class JrhReadPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``Read(BoundedSource)``"""
[docs] def matches(self, applied_ptransform):
from apache_beam.io import Read
from apache_beam.io.iobase import BoundedSource
return (
isinstance(applied_ptransform.transform, Read) and
isinstance(applied_ptransform.transform.source, BoundedSource))
[docs] def get_replacement_transform_for_applied_ptransform(
self, applied_ptransform):
from apache_beam.io import Read
from apache_beam.transforms import core
from apache_beam.transforms import util
# Make this a local to narrow what's captured in the closure.
source = applied_ptransform.transform.source
class JrhRead(core.PTransform):
def expand(self, pbegin):
return (
pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(
lambda _: source.split(
Read.get_desired_chunk_size(source.estimate_size())))
| util.Reshuffle()
| 'ReadSplits' >> core.FlatMap(
lambda split: split.source.read(
split.source.get_range_tracker(
split.start_position, split.stop_position))))
return JrhRead().with_output_types(
applied_ptransform.transform.get_type_hints().simple_output_type(
'Read'))
[docs]class CombineValuesPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombineValues``.
The DataflowRunner expects that the CombineValues PTransform acts as a
primitive. So this override replaces the CombineValues with a primitive.
"""
[docs] def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import CombineValues
if isinstance(applied_ptransform.transform, CombineValues):
self.transform = applied_ptransform.transform
return True
return False
[docs] def get_replacement_transform(self, ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import PTransform
from apache_beam.pvalue import PCollection
# The DataflowRunner still needs access to the CombineValues members to
# generate a V1B3 proto representation, so we remember the transform from
# the matches method and forward it here.
class CombineValuesReplacement(PTransform):
def __init__(self, transform):
self.transform = transform
def expand(self, pcoll):
return PCollection.from_(pcoll)
return CombineValuesReplacement(self.transform)
[docs]class NativeReadPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``Read`` using native sources.
The DataflowRunner expects that the Read PTransform using native sources act
as a primitive. So this override replaces the Read with a primitive.
"""
[docs] def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.io import Read
# Consider the native Read to be a primitive for Dataflow by replacing.
return (
isinstance(applied_ptransform.transform, Read) and
not getattr(applied_ptransform.transform, 'override', False) and
hasattr(applied_ptransform.transform.source, 'format'))
[docs] def get_replacement_transform(self, ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import pvalue
from apache_beam.io import iobase
# This is purposely subclassed from the Read transform to take advantage of
# the existing windowing, typing, and display data.
class Read(iobase.Read):
override = True
def expand(self, pbegin):
return pvalue.PCollection.from_(pbegin)
# Use the source's coder type hint as this replacement's output. Otherwise,
# the typing information is not properly forwarded to the DataflowRunner and
# will choose the incorrect coder for this transform.
return Read(ptransform.source).with_output_types(
ptransform.source.coder.to_type_hint())
[docs]class WriteToBigQueryPTransformOverride(PTransformOverride):
def __init__(self, pipeline, options):
super().__init__()
self.options = options
self.outputs = []
self._check_bq_outputs(pipeline)
def _check_bq_outputs(self, pipeline):
"""Checks that there are no consumers if the transform will be replaced.
The WriteToBigQuery replacement is the native BigQuerySink which has an
output of a PDone. The original transform, however, returns a dict. The user
may be inadvertantly using the dict output which will have no side-effects
or fail pipeline construction esoterically. This checks the outputs and
gives a user-friendsly error.
"""
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position, unused-import
from apache_beam.pipeline import PipelineVisitor
from apache_beam.io import WriteToBigQuery
# First, retrieve all the outpts from all the WriteToBigQuery transforms
# that will be replaced. Later, we will use these to make sure no one
# consumes these.
class GetWriteToBqOutputsVisitor(PipelineVisitor):
def __init__(self, matches):
self.matches = matches
self.outputs = set()
def enter_composite_transform(self, transform_node):
# Only add outputs that are going to be replaced.
if self.matches(transform_node):
self.outputs.update(set(transform_node.outputs.values()))
outputs_visitor = GetWriteToBqOutputsVisitor(self.matches)
pipeline.visit(outputs_visitor)
# Finally, verify that there are no consumers to the previously found
# outputs.
class VerifyWriteToBqOutputsVisitor(PipelineVisitor):
def __init__(self, outputs):
self.outputs = outputs
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
# Internal consumers of the outputs we're overriding are expected.
# We only error out on non-internal consumers.
if ('BigQueryBatchFileLoads' not in transform_node.full_label and
[o for o in self.outputs if o in transform_node.inputs]):
raise ValueError(
'WriteToBigQuery was being replaced with the native '
'BigQuerySink, but the transform "{}" has an input which will be '
'replaced with a PDone. To fix, please remove all transforms '
'that read from any WriteToBigQuery transforms.'.format(
transform_node.full_label))
pipeline.visit(VerifyWriteToBqOutputsVisitor(outputs_visitor.outputs))
[docs] def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import io
transform = applied_ptransform.transform
if (not isinstance(transform, io.WriteToBigQuery) or
getattr(transform, 'override', False)):
return False
experiments = self.options.view_as(DebugOptions).experiments or []
if 'use_legacy_bq_sink' not in experiments:
return False
if transform.schema == io.gcp.bigquery.SCHEMA_AUTODETECT:
raise RuntimeError(
'Schema auto-detection is not supported on the native sink')
# The replacement is only valid for Batch.
standard_options = self.options.view_as(StandardOptions)
if standard_options.streaming:
if transform.write_disposition == io.BigQueryDisposition.WRITE_TRUNCATE:
raise RuntimeError('Can not use write truncation mode in streaming')
return False
self.outputs = list(applied_ptransform.outputs.keys())
return True
[docs] def get_replacement_transform(self, ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import io
class WriteToBigQuery(io.WriteToBigQuery):
override = True
def __init__(self, transform, outputs):
self.transform = transform
self.outputs = outputs
def __getattr__(self, name):
"""Returns the given attribute from the parent.
This allows this transform to act like a WriteToBigQuery transform
without having to construct a new WriteToBigQuery transform.
"""
return self.transform.__getattribute__(name)
def expand(self, pcoll):
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
import json
schema = None
if self.schema:
schema = parse_table_schema_from_json(json.dumps(self.schema))
out = pcoll | io.Write(
io.BigQuerySink(
self.table_reference.tableId,
self.table_reference.datasetId,
self.table_reference.projectId,
schema,
self.create_disposition,
self.write_disposition,
kms_key=self.kms_key))
# The WriteToBigQuery can have different outputs depending on if it's
# Batch or Streaming. This retrieved the output keys from the node and
# is replacing them here to be consistent.
return {key: out for key in self.outputs}
return WriteToBigQuery(ptransform, self.outputs)
[docs]class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.
This override simply returns the original transform but additionally records
the output PCollection in order to append required step properties during
graph translation.
"""
def __init__(self, dataflow_runner, options):
self.dataflow_runner = dataflow_runner
self.options = options
[docs] def matches(self, applied_ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import util
transform = applied_ptransform.transform
if not isinstance(transform, util.GroupIntoBatches.WithShardedKey):
return False
# The replacement is only valid for portable Streaming Engine jobs with
# runner v2.
standard_options = self.options.view_as(StandardOptions)
if not standard_options.streaming:
return False
google_cloud_options = self.options.view_as(GoogleCloudOptions)
if not google_cloud_options.enable_streaming_engine:
raise ValueError(
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for non-Streaming-Engine jobs. In order to use '
'runner determined sharding, please use '
'--streaming --enable_streaming_engine --experiments=use_runner_v2')
from apache_beam.runners.dataflow.internal import apiclient
if not apiclient._use_unified_worker(self.options):
raise ValueError(
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2. In order to use '
'runner determined sharding, please use '
'--streaming --enable_streaming_engine --experiments=use_runner_v2')
self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform)
return True
[docs] def get_replacement_transform_for_applied_ptransform(self, ptransform):
return ptransform.transform