#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cython: profile=True
"""Worker operations executor.
For internal use only; no backwards-compatibility guarantees.
"""
import sys
import traceback
from apache_beam.internal import util
from apache_beam.metrics.execution import ScopedMetricsContainer
from apache_beam.pvalue import TaggedOutput
from apache_beam.transforms import core
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils.windowed_value import WindowedValue
[docs]class LoggingContext(object):
"""For internal use only; no backwards-compatibility guarantees."""
[docs] def enter(self):
pass
[docs] def exit(self):
pass
[docs]class Receiver(object):
"""For internal use only; no backwards-compatibility guarantees.
An object that consumes a WindowedValue.
This class can be efficiently used to pass values between the
sdk and worker harnesses.
"""
[docs] def receive(self, windowed_value):
raise NotImplementedError
[docs]class DoFnMethodWrapper(object):
"""For internal use only; no backwards-compatibility guarantees.
Represents a method of a DoFn object."""
def __init__(self, do_fn, method_name):
"""
Initiates a ``DoFnMethodWrapper``.
Args:
do_fn: A DoFn object that contains the method.
method_name: name of the method as a string.
"""
args, _, _, defaults = do_fn.get_function_arguments(method_name)
defaults = defaults if defaults else []
method_value = getattr(do_fn, method_name)
self.method_value = method_value
self.args = args
self.defaults = defaults
[docs]class DoFnSignature(object):
"""Represents the signature of a given ``DoFn`` object.
Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``.
Among other things, this will give an extensible way for for (1) accessing the
structure of the ``DoFn`` including methods and method parameters
(2) identifying features that a given ``DoFn`` support, for example, whether
a given ``DoFn`` is a Splittable ``DoFn`` (
https://s.apache.org/splittable-do-fn) (3) validating a ``DoFn`` based on the
feature set offered by it.
"""
def __init__(self, do_fn):
# We add a property here for all methods defined by Beam DoFn features.
assert isinstance(do_fn, core.DoFn)
self.do_fn = do_fn
self.process_method = DoFnMethodWrapper(do_fn, 'process')
self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle')
self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle')
self._validate()
def _validate(self):
self._validate_process()
self._validate_bundle_method(self.start_bundle_method)
self._validate_bundle_method(self.finish_bundle_method)
def _validate_process(self):
"""Validate that none of the DoFnParameters are repeated in the function
"""
for param in core.DoFn.DoFnParams:
assert self.process_method.defaults.count(param) <= 1
def _validate_bundle_method(self, method_wrapper):
"""Validate that none of the DoFnParameters are used in the function
"""
for param in core.DoFn.DoFnParams:
assert param not in method_wrapper.defaults
[docs]class DoFnInvoker(object):
"""An abstraction that can be used to execute DoFn methods.
A DoFnInvoker describes a particular way for invoking methods of a DoFn
represented by a given DoFnSignature."""
def __init__(self, output_processor, signature):
self.output_processor = output_processor
self.signature = signature
@staticmethod
[docs] def create_invoker(
output_processor,
signature, context, side_inputs, input_args, input_kwargs):
""" Creates a new DoFnInvoker based on given arguments.
Args:
signature: a DoFnSignature for the DoFn being invoked.
context: Context to be used when invoking the DoFn (deprecated).
side_inputs: side inputs to be used when invoking th process method.
input_args: arguments to be used when invoking the process method
input_kwargs: kwargs to be used when invoking the process method.
"""
default_arg_values = signature.process_method.defaults
use_simple_invoker = (
not side_inputs and not input_args and not input_kwargs and
not default_arg_values)
if use_simple_invoker:
return SimpleInvoker(output_processor, signature)
else:
return PerWindowInvoker(
output_processor,
signature, context, side_inputs, input_args, input_kwargs)
[docs] def invoke_process(self, windowed_value):
"""Invokes the DoFn.process() function.
Args:
windowed_value: a WindowedValue object that gives the element for which
process() method should be invoked along with the window
the element belongs to.
"""
raise NotImplementedError
[docs] def invoke_start_bundle(self):
"""Invokes the DoFn.start_bundle() method.
"""
self.output_processor.start_bundle_outputs(
self.signature.start_bundle_method.method_value())
[docs] def invoke_finish_bundle(self):
"""Invokes the DoFn.finish_bundle() method.
"""
self.output_processor.finish_bundle_outputs(
self.signature.finish_bundle_method.method_value())
[docs]class SimpleInvoker(DoFnInvoker):
"""An invoker that processes elements ignoring windowing information."""
def __init__(self, output_processor, signature):
super(SimpleInvoker, self).__init__(output_processor, signature)
self.process_method = signature.process_method.method_value
[docs] def invoke_process(self, windowed_value):
self.output_processor.process_outputs(
windowed_value, self.process_method(windowed_value.value))
[docs]class PerWindowInvoker(DoFnInvoker):
"""An invoker that processes elements considering windowing information."""
def __init__(self, output_processor, signature, context,
side_inputs, input_args, input_kwargs):
super(PerWindowInvoker, self).__init__(output_processor, signature)
self.side_inputs = side_inputs
self.context = context
self.process_method = signature.process_method.method_value
default_arg_values = signature.process_method.defaults
self.has_windowed_inputs = (
not all(si.is_globally_windowed() for si in side_inputs) or
(core.DoFn.WindowParam in default_arg_values))
# Try to prepare all the arguments that can just be filled in
# without any additional work. in the process function.
# Also cache all the placeholders needed in the process function.
# Fill in sideInputs if they are globally windowed
global_window = GlobalWindow()
input_args = input_args if input_args else []
input_kwargs = input_kwargs if input_kwargs else {}
if not self.has_windowed_inputs:
input_args, input_kwargs = util.insert_values_in_args(
input_args, input_kwargs, [si[global_window] for si in side_inputs])
arguments = signature.process_method.args
defaults = signature.process_method.defaults
# Create placeholder for element parameter of DoFn.process() method.
self_in_args = int(signature.do_fn.is_process_bounded())
class ArgPlaceholder(object):
def __init__(self, placeholder):
self.placeholder = placeholder
if core.DoFn.ElementParam not in default_arg_values:
args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args
args_with_placeholders = (
[ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick])
else:
args_to_pick = len(arguments) - len(defaults) - self_in_args
args_with_placeholders = input_args[:args_to_pick]
# Fill the OtherPlaceholders for context, window or timestamp
remaining_args_iter = iter(input_args[args_to_pick:])
for a, d in zip(arguments[-len(defaults):], defaults):
if d == core.DoFn.ElementParam:
args_with_placeholders.append(ArgPlaceholder(d))
elif d == core.DoFn.WindowParam:
args_with_placeholders.append(ArgPlaceholder(d))
elif d == core.DoFn.TimestampParam:
args_with_placeholders.append(ArgPlaceholder(d))
elif d == core.DoFn.SideInputParam:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(remaining_args_iter.next())
except StopIteration:
if a not in input_kwargs:
raise ValueError("Value for sideinput %s not provided" % a)
else:
# If no more args are present then the value must be passed via kwarg
try:
args_with_placeholders.append(remaining_args_iter.next())
except StopIteration:
pass
args_with_placeholders.extend(list(remaining_args_iter))
# Stash the list of placeholder positions for performance
self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(
args_with_placeholders)
if isinstance(x, ArgPlaceholder)]
self.args_for_process = args_with_placeholders
self.kwargs_for_process = input_kwargs
[docs] def invoke_process(self, windowed_value):
self.context.set_element(windowed_value)
# Call for the process function for each window if has windowed side inputs
# or if the process accesses the window parameter. We can just call it once
# otherwise as none of the arguments are changing
if self.has_windowed_inputs and len(windowed_value.windows) != 1:
for w in windowed_value.windows:
self._invoke_per_window(
WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)))
else:
self._invoke_per_window(windowed_value)
def _invoke_per_window(self, windowed_value):
if self.has_windowed_inputs:
window, = windowed_value.windows
args_for_process, kwargs_for_process = util.insert_values_in_args(
self.args_for_process, self.kwargs_for_process,
[si[window] for si in self.side_inputs])
else:
args_for_process, kwargs_for_process = (
self.args_for_process, self.kwargs_for_process)
# TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
for i, p in self.placeholders:
if p == core.DoFn.ElementParam:
args_for_process[i] = windowed_value.value
elif p == core.DoFn.WindowParam:
args_for_process[i] = window
elif p == core.DoFn.TimestampParam:
args_for_process[i] = windowed_value.timestamp
if kwargs_for_process:
self.output_processor.process_outputs(
windowed_value,
self.process_method(*args_for_process, **kwargs_for_process))
else:
self.output_processor.process_outputs(
windowed_value, self.process_method(*args_for_process))
[docs]class DoFnRunner(Receiver):
"""For internal use only; no backwards-compatibility guarantees.
A helper class for executing ParDo operations.
"""
def __init__(self,
fn,
args,
kwargs,
side_inputs,
windowing,
context=None,
tagged_receivers=None,
logger=None,
step_name=None,
# Preferred alternative to logger
# TODO(robertwb): Remove once all runners are updated.
logging_context=None,
# Preferred alternative to context
# TODO(robertwb): Remove once all runners are updated.
state=None,
scoped_metrics_container=None):
"""Initializes a DoFnRunner.
Args:
fn: user DoFn to invoke
args: positional side input arguments (static and placeholder), if any
kwargs: keyword side input arguments (static and placeholder), if any
side_inputs: list of sideinput.SideInputMaps for deferred side inputs
windowing: windowing properties of the output PCollection(s)
context: a DoFnContext to use (deprecated)
tagged_receivers: a dict of tag name to Receiver objects
logger: a logging module (deprecated)
step_name: the name of this step
logging_context: a LoggingContext object
state: handle for accessing DoFn state
scoped_metrics_container: Context switcher for metrics container
"""
self.scoped_metrics_container = (scoped_metrics_container
or ScopedMetricsContainer())
self.step_name = step_name
# Need to support multiple iterations.
side_inputs = list(side_inputs)
if logging_context:
self.logging_context = logging_context
else:
self.logging_context = get_logging_context(logger, step_name=step_name)
# TODO(sourabh): Deprecate the use of context
if state:
assert context is None
context = DoFnContext(step_name, state=state)
else:
assert context is not None
context = context
self.context = context
do_fn_signature = DoFnSignature(fn)
# Optimize for the common case.
main_receivers = as_receiver(tagged_receivers[None])
output_processor = _OutputProcessor(
windowing.windowfn, main_receivers, tagged_receivers)
self.do_fn_invoker = DoFnInvoker.create_invoker(
output_processor, do_fn_signature, context, side_inputs, args, kwargs)
[docs] def receive(self, windowed_value):
self.process(windowed_value)
[docs] def process(self, windowed_value):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
self.do_fn_invoker.invoke_process(windowed_value)
except BaseException as exn:
self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()
def _invoke_bundle_method(self, bundle_method):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
self.context.set_element(None)
bundle_method()
except BaseException as exn:
self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()
[docs] def start(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
[docs] def finish(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
def _reraise_augmented(self, exn):
if getattr(exn, '_tagged_with_step', False) or not self.step_name:
raise
step_annotation = " [while running '%s']" % self.step_name
# To emulate exception chaining (not available in Python 2).
original_traceback = sys.exc_info()[2]
try:
# Attempt to construct the same kind of exception
# with an augmented message.
new_exn = type(exn)(exn.args[0] + step_annotation, *exn.args[1:])
new_exn._tagged_with_step = True # Could raise attribute error.
except: # pylint: disable=bare-except
# If anything goes wrong, construct a RuntimeError whose message
# records the original exception's type and message.
new_exn = RuntimeError(
traceback.format_exception_only(type(exn), exn)[-1].strip()
+ step_annotation)
new_exn._tagged_with_step = True
raise new_exn, None, original_traceback
class _OutputProcessor(object):
"""Processes output produced by DoFn method invocations."""
def __init__(self, window_fn, main_receivers, tagged_receivers):
"""Initializes ``_OutputProcessor``.
Args:
window_fn: a windowing function (WindowFn).
main_receivers: a dict of tag name to Receiver objects.
tagged_receivers: main receiver object.
"""
self.window_fn = window_fn
self.main_receivers = main_receivers
self.tagged_receivers = tagged_receivers
def process_outputs(self, windowed_input_element, results):
"""Dispatch the result of process computation to the appropriate receivers.
A value wrapped in a TaggedOutput object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
if results is None:
return
for result in results:
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
windowed_value = result
if (windowed_input_element is not None
and len(windowed_input_element.windows) != 1):
windowed_value.windows *= len(windowed_input_element.windows)
elif isinstance(result, TimestampedValue):
assign_context = WindowFn.AssignContext(result.timestamp, result.value)
windowed_value = WindowedValue(
result.value, result.timestamp,
self.window_fn.assign(assign_context))
if len(windowed_input_element.windows) != 1:
windowed_value.windows *= len(windowed_input_element.windows)
else:
windowed_value = windowed_input_element.with_value(result)
if tag is None:
self.main_receivers.receive(windowed_value)
else:
self.tagged_receivers[tag].output(windowed_value)
def start_bundle_outputs(self, results):
"""Validate that start_bundle does not output any elements"""
if results is None:
return
raise RuntimeError(
'Start Bundle should not output any elements but got %s' % results)
def finish_bundle_outputs(self, results):
"""Dispatch the result of finish_bundle to the appropriate receivers.
A value wrapped in a TaggedOutput object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
if results is None:
return
for result in results:
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
if not isinstance(tag, basestring):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
windowed_value = result
else:
raise RuntimeError('Finish Bundle should only output WindowedValue ' +\
'type but got %s' % type(result))
if tag is None:
self.main_receivers.receive(windowed_value)
else:
self.tagged_receivers[tag].output(windowed_value)
class _NoContext(WindowFn.AssignContext):
"""An uninspectable WindowFn.AssignContext."""
NO_VALUE = object()
def __init__(self, value, timestamp=NO_VALUE):
self.value = value
self._timestamp = timestamp
@property
def timestamp(self):
if self._timestamp is self.NO_VALUE:
raise ValueError('No timestamp in this context.')
else:
return self._timestamp
@property
def existing_windows(self):
raise ValueError('No existing_windows in this context.')
[docs]class DoFnState(object):
"""For internal use only; no backwards-compatibility guarantees.
Keeps track of state that DoFns want, currently, user counters.
"""
def __init__(self, counter_factory):
self.step_name = ''
self._counter_factory = counter_factory
[docs] def counter_for(self, aggregator):
"""Looks up the counter for this aggregator, creating one if necessary."""
return self._counter_factory.get_aggregator_counter(
self.step_name, aggregator)
# TODO(robertwb): Replace core.DoFnContext with this.
[docs]class DoFnContext(object):
"""For internal use only; no backwards-compatibility guarantees."""
def __init__(self, label, element=None, state=None):
self.label = label
self.state = state
if element is not None:
self.set_element(element)
[docs] def set_element(self, windowed_value):
self.windowed_value = windowed_value
@property
def element(self):
if self.windowed_value is None:
raise AttributeError('element not accessible in this context')
else:
return self.windowed_value.value
@property
def timestamp(self):
if self.windowed_value is None:
raise AttributeError('timestamp not accessible in this context')
else:
return self.windowed_value.timestamp
@property
def windows(self):
if self.windowed_value is None:
raise AttributeError('windows not accessible in this context')
else:
return self.windowed_value.windows
# TODO(robertwb): Remove all these adapters once service is updated out.
class _LoggingContextAdapter(LoggingContext):
def __init__(self, underlying):
self.underlying = underlying
def enter(self):
self.underlying.enter()
def exit(self):
self.underlying.exit()
[docs]def get_logging_context(maybe_logger, **kwargs):
if maybe_logger:
maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
if isinstance(maybe_context, LoggingContext):
return maybe_context
return _LoggingContextAdapter(maybe_context)
return LoggingContext()
class _ReceiverAdapter(Receiver):
def __init__(self, underlying):
self.underlying = underlying
def receive(self, windowed_value):
self.underlying.output(windowed_value)
[docs]def as_receiver(maybe_receiver):
"""For internal use only; no backwards-compatibility guarantees."""
if isinstance(maybe_receiver, Receiver):
return maybe_receiver
return _ReceiverAdapter(maybe_receiver)