apache_beam.utils package

Submodules

apache_beam.utils.annotations module

Deprecated and experimental annotations.

For internal use only; no backwards-compatibility guarantees.

Annotations come in two flavors: deprecated and experimental

The ‘deprecated’ annotation requires a ‘since” parameter to specify what version deprecated it. Both ‘deprecated’ and ‘experimental’ annotations can specify the current recommended version to use by means of a ‘current’ parameter.

The following example illustrates how to annotate coexisting versions of the same function ‘multiply’.:

def multiply(arg1, arg2):
  print arg1, '*', arg2, '=',
  return arg1*arg2

# This annotation marks ‘old_multiply’ as deprecated since ‘v.1’ and suggests # using ‘multiply’ instead.:

@deprecated(since='v.1', current='multiply')
def old_multiply(arg1, arg2):
  result = 0
  for i in xrange(arg1):
      result += arg2
  print arg1, '*', arg2, '(the old way)=',
  return result

# This annotation marks ‘exp_multiply’ as experimental and suggests # using ‘multiply’ instead.:

@experimental(since='v.1', current='multiply')
def exp_multiply(arg1, arg2):
  print arg1, '*', arg2, '(the experimental way)=',
  return (arg1*arg2)*(arg1/arg2)*(arg2/arg1)

# Set a warning filter to control how often warnings are produced.:

warnings.simplefilter("always")
print multiply(5, 6)
print old_multiply(5,6)
print exp_multiply(5,6)
apache_beam.utils.annotations.annotate(label, since, current, extra_message)[source]

Decorates a function with a deprecated or experimental annotation.

Parameters:
  • label – the kind of annotation (‘deprecated’ or ‘experimental’).
  • since – the version that causes the annotation.
  • current – the suggested replacement function.
  • extra_message – an optional additional message.
Returns:

The decorator for the function.

apache_beam.utils.counters module

Counters collect the progress of the Worker for reporting to the service.

For internal use only; no backwards-compatibility guarantees.

class apache_beam.utils.counters.AccumulatorCombineFnCounter(name, combine_fn)[source]

Bases: apache_beam.utils.counters.Counter

Counter optimized for a mutating accumulator that holds all the logic.

update(value)[source]
class apache_beam.utils.counters.Counter(name, combine_fn)[source]

Bases: object

A counter aggregates a series of values.

The aggregation kind of the Counter is specified when the Counter is created. The values aggregated must be of an appropriate for the aggregation used. Aggregations supported are listed in the code.

(The aggregated value will be reported to the Dataflow service.)

Do not create directly; call CounterFactory.get_counter instead.

name

the name of the counter, a string

combine_fn

the CombineFn to use for aggregation

accumulator

the accumulator created for the combine_fn

MEAN = <apache_beam.transforms.cy_combiners.MeanInt64Fn object>
SUM = <apache_beam.transforms.cy_combiners.SumInt64Fn object>
update(value)[source]
value()[source]
class apache_beam.utils.counters.CounterFactory[source]

Bases: object

Keeps track of unique counters.

get_aggregator_counter(step_name, aggregator)[source]

Returns an AggregationCounter for this step’s aggregator.

Passing in the same values will return the same counter.

Parameters:
  • step_name – the name of this step.
  • aggregator – an Aggregator object.
Returns:

A new or existing counter.

get_aggregator_values(aggregator_or_name)[source]

Returns dict of step names to values of the aggregator.

get_counter(name, combine_fn)[source]

Returns a counter with the requested name.

Passing in the same name will return the same counter; the combine_fn must agree.

Parameters:
  • name – the name of this counter. Typically has three parts: “step-output-counter”.
  • combine_fn – the CombineFn to use for aggregation
Returns:

A new or existing counter with the requested name.

get_counters()[source]

Returns the current set of counters.

Returns:An iterable that contains the current set of counters. To make sure that multiple threads can iterate over the set of counters, we return a new iterable here. Note that the actual set of counters may get modified after this method returns hence the returned iterable may be stale.
apache_beam.utils.counters.get_aggregator_values(aggregator_or_name, counter_dict, value_extractor=None)[source]

Extracts the named aggregator value from a set of counters.

Parameters:
  • aggregator_or_name – an Aggregator object or the name of one.
  • counter_dict – a dict object of {name: value_wrapper}
  • value_extractor – a function to convert the value_wrapper into a value. If None, no extraction is done and the value is return unchanged.
Returns:

dict of step names to values of the aggregator.

apache_beam.utils.processes module

Cross-platform utilities for creating subprocesses.

For internal use only; no backwards-compatibility guarantees.

apache_beam.utils.processes.Popen(*args, **kwargs)[source]
apache_beam.utils.processes.call(*args, **kwargs)[source]
apache_beam.utils.processes.check_call(*args, **kwargs)[source]
apache_beam.utils.processes.check_output(*args, **kwargs)[source]

apache_beam.utils.profiler module

A profiler context manager based on cProfile.Profile objects.

For internal use only; no backwards-compatibility guarantees.

class apache_beam.utils.profiler.MemoryReporter(interval_second=60.0)[source]

Bases: object

A memory reporter that reports the memory usage and heap profile. Usage::

mr = MemoryReporter(interval_second=30.0)
mr.start()
while ...
  <do something>
  # this will report continuously with 30 seconds between reports.
mr.stop()

NOTE: A reporter with start() should always stop(), or the parent process can never finish.

Or simply the following which does star() and stop():
with MemoryReporter(interval_second=100):
while ...
<do some thing>

Also it could report on demand without continuous reporting.:

mr = MemoryReporter()  # default interval 60s but not started.
<do something>
mr.report_once()
report_once()[source]
start()[source]
stop()[source]
class apache_beam.utils.profiler.Profile(profile_id, profile_location=None, log_results=False, file_copy_fn=None)[source]

Bases: object

cProfile wrapper context for saving and logging profiler results.

SORTBY = 'cumulative'

apache_beam.utils.proto_utils module

For internal use only; no backwards-compatibility guarantees.

apache_beam.utils.proto_utils.pack_Any(msg)[source]

Creates a protobuf Any with msg as its content.

Returns None if msg is None.

apache_beam.utils.proto_utils.pack_Struct(**kwargs)[source]

Returns a struct containing the values indicated by kwargs.

apache_beam.utils.proto_utils.unpack_Any(any_msg, msg_class)[source]

Unpacks any_msg into msg_class.

Returns None if msg_class is None.

apache_beam.utils.retry module

Retry decorators for calls raising exceptions.

For internal use only; no backwards-compatibility guarantees.

This module is used mostly to decorate all integration points where the code makes calls to remote services. Searching through the code base for @retry should find all such places. For this reason even places where retry is not needed right now use a @retry.no_retries decorator.

class apache_beam.utils.retry.Clock[source]

Bases: object

A simple clock implementing sleep().

sleep(value)[source]
class apache_beam.utils.retry.FuzzedExponentialIntervals(initial_delay_secs, num_retries, factor=2, fuzz=0.5, max_delay_secs=3600)[source]

Bases: object

Iterable for intervals that are exponentially spaced, with fuzzing.

On iteration, yields retry interval lengths, in seconds. Every iteration over this iterable will yield differently fuzzed interval lengths, as long as fuzz is nonzero.

Parameters:
  • initial_delay_secs – The delay before the first retry, in seconds.
  • num_retries – The total number of times to retry.
  • factor – The exponential factor to use on subsequent retries. Default is 2 (doubling).
  • fuzz – A value between 0 and 1, indicating the fraction of fuzz. For a given delay d, the fuzzed delay is randomly chosen between [(1 - fuzz) * d, d].
  • max_delay_secs – Maximum delay (in seconds). After this limit is reached, further tries use max_delay_sec instead of exponentially increasing the time. Defaults to 1 hour.
exception apache_beam.utils.retry.PermanentException[source]

Bases: exceptions.Exception

Base class for exceptions that should not be retried.

apache_beam.utils.retry.no_retries(fun)[source]

A retry decorator for places where we do not want retries.

apache_beam.utils.retry.retry_on_server_errors_and_timeout_filter(exception)[source]
apache_beam.utils.retry.retry_on_server_errors_filter(exception)[source]

Filter allowing retries on server errors and non-HttpErrors.

apache_beam.utils.retry.with_exponential_backoff(num_retries=7, initial_delay_secs=5.0, logger=<function warning>, retry_filter=<function retry_on_server_errors_filter>, clock=<apache_beam.utils.retry.Clock object>, fuzz=True, factor=2, max_delay_secs=3600)[source]

Decorator with arguments that control the retry logic.

Parameters:
  • num_retries – The total number of times to retry.
  • initial_delay_secs – The delay before the first retry, in seconds.
  • logger – A callable used to report an exception. Must have the same signature as functions in the standard logging module. The default is logging.warning.
  • retry_filter – A callable getting the exception raised and returning True if the retry should happen. For instance we do not want to retry on 404 Http errors most of the time. The default value will return true for server errors (HTTP status code >= 500) and non Http errors.
  • clock – A clock object implementing a sleep method. The default clock will use time.sleep().
  • fuzz – True if the delay should be fuzzed (default). During testing False can be used so that the delays are not randomized.
  • factor – The exponential factor to use on subsequent retries. Default is 2 (doubling).
  • max_delay_secs – Maximum delay (in seconds). After this limit is reached, further tries use max_delay_sec instead of exponentially increasing the time. Defaults to 1 hour.
Returns:

As per Python decorators with arguments pattern returns a decorator for the function which in turn will return the wrapped (decorated) function.

The decorator is intended to be used on callables that make HTTP or RPC requests that can temporarily timeout or have transient errors. For instance the make_http_request() call below will be retried 16 times with exponential backoff and fuzzing of the delay interval (default settings).

from apache_beam.utils import retry # ... @retry.with_exponential_backoff() make_http_request(args)

apache_beam.utils.timestamp module

Timestamp utilities.

For internal use only; no backwards-compatibility guarantees.

class apache_beam.utils.timestamp.Duration(seconds=0, micros=0)[source]

Bases: object

Represents a second duration with microsecond granularity.

Can be treated in common arithmetic operations as a numeric type.

Internally stores a time interval as an int of microseconds. This strategy is necessary since floating point values lose precision when storing values, especially after arithmetic operations (for example, 10000000 % 0.1 evaluates to 0.0999999994448885).

static of(seconds)[source]

Return the Duration for the given number of seconds since Unix epoch.

If the input is already a Duration, the input itself will be returned.

Parameters:seconds – Number of seconds as int, float or Duration.
Returns:Corresponding Duration object.
class apache_beam.utils.timestamp.Timestamp(seconds=0, micros=0)[source]

Bases: object

Represents a Unix second timestamp with microsecond granularity.

Can be treated in common timestamp arithmetic operations as a numeric type.

Internally stores a time interval as an int of microseconds. This strategy is necessary since floating point values lose precision when storing values, especially after arithmetic operations (for example, 10000000 % 0.1 evaluates to 0.0999999994448885).

isoformat()[source]
static of(seconds)[source]

Return the Timestamp for the given number of seconds.

If the input is already a Timestamp, the input itself will be returned.

Parameters:seconds – Number of seconds as int, float or Timestamp.
Returns:Corresponding Timestamp object.
predecessor()[source]

Returns the largest timestamp smaller than self.

to_utc_datetime()[source]

apache_beam.utils.urns module

For internal use only; no backwards-compatibility guarantees.

class apache_beam.utils.urns.RunnerApiFn[source]

Bases: object

Abstract base class that provides urn registration utilities.

A class that inherits from this class will get a registration-based from_runner_api and to_runner_api method that convert to and from beam_runner_api_pb2.SdkFunctionSpec.

Additionally, register_pickle_urn can be called from the body of a class to register serialization via pickling.

classmethod from_runner_api(fn_proto, context)[source]

Converts from an SdkFunctionSpec to a Fn object.

Prefer registering a urn with its parameter type and constructor.

classmethod register_pickle_urn(pickle_urn)[source]

Registers and implements the given urn via pickling.

classmethod register_urn(urn, parameter_type, fn=None)[source]

Registeres a urn with a constructor.

For example, if ‘beam:fn:foo’ had paramter type FooPayload, one could write RunnerApiFn.register_urn(‘bean:fn:foo’, FooPayload, foo_from_proto) where foo_from_proto took as arguments a FooPayload and a PipelineContext. This function can also be used as a decorator rather than passing the callable in as the final parameter.

A corresponding to_runner_api_parameter method would be expected that returns the tuple (‘beam:fn:foo’, FooPayload)

to_runner_api(context)[source]

Returns an SdkFunctionSpec encoding this Fn.

Prefer overriding self.to_runner_api_parameter.

to_runner_api_parameter(unused_context)[source]

Returns the urn and payload for this Fn.

The returned urn(s) should be registered with register_urn.

apache_beam.utils.windowed_value module

Core windowing data structures.

This module is experimental. No backwards-compatibility guarantees.

class apache_beam.utils.windowed_value.WindowedValue(value, timestamp, windows)[source]

Bases: object

A windowed value having a value, a timestamp and set of windows.

value

The underlying value of a windowed value.

timestamp

Timestamp associated with the value as seconds since Unix epoch.

windows

A set (iterable) of window objects for the value. The window object are descendants of the BoundedWindow class.

timestamp
timestamp_object = None
with_value(new_value)[source]

Creates a new WindowedValue with the same timestamps and windows as this.

This is the fasted way to create a new WindowedValue.

apache_beam.utils.windowed_value.create(value, timestamp_micros, windows)[source]

Module contents

A package containing internal utilities.

For internal use only; no backwards-compatibility guarantees.