apache_beam.io.source_test_utils module

Helper functions and test harnesses for source implementations.

This module contains helper functions and test harnesses for checking correctness of source (a subclass of iobase.BoundedSource) and range tracker (a subclass of``iobase.RangeTracker``) implementations.

Contains a few lightweight utilities (e.g. reading items from a source such as readFromSource(), as well as heavyweight property testing and stress testing harnesses that help getting a large amount of test coverage with few code.

Most notable ones are: * assertSourcesEqualReferenceSource() helps testing that the data read by the union of sources produced by BoundedSource.split() is the same as data read by the original source. * If your source implements dynamic work rebalancing, use the assertSplitAtFraction() family of functions - they test behavior of RangeTracker.try_split(), in particular, that various consistency properties are respected and the total set of data read by the source is preserved when splits happen. Use assertSplitAtFractionBehavior() to test individual cases of RangeTracker.try_split() and use assertSplitAtFractionExhaustive() as a heavy-weight stress test including concurrency. We strongly recommend to use both.

For example usages, see the unit tests of modules such as
  • apache_beam.io.source_test_utils_test.py

  • apache_beam.io.avroio_test.py

apache_beam.io.source_test_utils.read_from_source(source, start_position=None, stop_position=None)[source]

Reads elements from the given `BoundedSource`.

Only reads elements within the given position range. :param source: BoundedSource implementation. :type source: ~apache_beam.io.iobase.BoundedSource :param start_position: start position for reading. :type start_position: int :param stop_position: stop position for reading. :type stop_position: int

Returns:

the set of values read from the sources.

Return type:

List[str]

apache_beam.io.source_test_utils.assert_sources_equal_reference_source(reference_source_info, sources_info)[source]

Tests if a reference source is equal to a given set of sources.

Given a reference source (a BoundedSource and a position range) and a list of sources, assert that the union of the records read from the list of sources is equal to the records read from the reference source.

Parameters:
  • reference_source_info (Tuple[BoundedSource, int, int]) – a three-tuple that gives the reference BoundedSource, position to start reading at, and position to stop reading at.

  • sources_info (Iterable[Tuple[BoundedSource, int, int]]) – a set of sources. Each source is a three-tuple that is of the same format described above.

Raises:

ValueError – if the set of data produced by the reference source and the given set of sources are not equivalent.

apache_beam.io.source_test_utils.assert_reentrant_reads_succeed(source_info)[source]

Tests if a given source can be read in a reentrant manner.

Assume that given source produces the set of values {v1, v2, v3, ... vn}. For i in range [1, n-1] this method performs a reentrant read after reading i elements and verifies that both the original and reentrant read produce the expected set of values.

Parameters:

source_info (Tuple[BoundedSource, int, int]) – a three-tuple that gives the reference BoundedSource, position to start reading at, and a position to stop reading at.

Raises:

ValueError – if source is too trivial or reentrant read result in an incorrect read.

apache_beam.io.source_test_utils.assert_split_at_fraction_behavior(source, num_items_to_read_before_split, split_fraction, expected_outcome)[source]

Verifies the behaviour of splitting a source at a given fraction.

Asserts that splitting a BoundedSource either fails after reading num_items_to_read_before_split items, or succeeds in a way that is consistent according to assert_split_at_fraction_succeeds_and_consistent().

Parameters:
  • source (BoundedSource) – the source to perform dynamic splitting on.

  • num_items_to_read_before_split (int) – number of items to read before splitting.

  • split_fraction (float) – fraction to split at.

  • expected_outcome (int) – a value from ExpectedSplitOutcome.

Returns:

a tuple that gives the number of items produced by reading the two ranges produced after dynamic splitting. If splitting did not occur, the first value of the tuple will represent the full set of records read by the source while the second value of the tuple will be -1.

Return type:

Tuple[int, int]

apache_beam.io.source_test_utils.assert_split_at_fraction_binary(source, expected_items, num_items_to_read_before_split, left_fraction, left_result, right_fraction, right_result, stats, start_position=None, stop_position=None)[source]

Performs dynamic work rebalancing for fractions within a given range.

Asserts that given a start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds.

Parameters:
  • source – source to perform dynamic splitting on.

  • expected_items – total set of items expected when reading the source.

  • num_items_to_read_before_split – number of items to read before splitting.

  • left_fraction – left fraction for binary splitting.

  • left_result – result received by splitting at left fraction.

  • right_fraction – right fraction for binary splitting.

  • right_result – result received by splitting at right fraction.

  • stats – a SplitFractionStatistics for storing results.

apache_beam.io.source_test_utils.assert_split_at_fraction_exhaustive(source, start_position=None, stop_position=None, perform_multi_threaded_test=True)[source]

Performs and tests dynamic work rebalancing exhaustively.

Asserts that for each possible start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds. Verifies multi threaded splitting as well.

Parameters:
  • source (BoundedSource) – the source to perform dynamic splitting on.

  • perform_multi_threaded_test (bool) – if True performs a multi-threaded test, otherwise this test is skipped.

Raises:

ValueError – if the exhaustive splitting test fails.

apache_beam.io.source_test_utils.assert_split_at_fraction_fails(source, num_items_to_read_before_split, split_fraction)[source]

Asserts that dynamic work rebalancing at a given fraction fails.

Asserts that trying to perform dynamic splitting after reading ‘num_items_to_read_before_split’ items from the source fails.

Parameters:
  • source – source to perform dynamic splitting on.

  • num_items_to_read_before_split – number of items to read before splitting.

  • split_fraction – fraction to split at.

apache_beam.io.source_test_utils.assert_split_at_fraction_succeeds_and_consistent(source, num_items_to_read_before_split, split_fraction)[source]

Verifies some consistency properties of dynamic work rebalancing.

Equivalent to the following pseudocode::

original_range_tracker = source.getRangeTracker(None, None)
original_reader = source.read(original_range_tracker)
items_before_split = read N items from original_reader
suggested_split_position = original_range_tracker.position_for_fraction(
  split_fraction)
original_stop_position - original_range_tracker.stop_position()
split_result = range_tracker.try_split()
split_position, split_fraction = split_result
primary_range_tracker = source.get_range_tracker(
  original_range_tracker.start_position(), split_position)
residual_range_tracker = source.get_range_tracker(split_position,
  original_stop_position)

assert that: items when reading source.read(primary_range_tracker) ==
  items_before_split + items from continuing to read 'original_reader'
assert that: items when reading source.read(original_range_tracker) =
  items when reading source.read(primary_range_tracker) + items when reading
source.read(residual_range_tracker)
Parameters:
  • source – source to perform dynamic work rebalancing on.

  • num_items_to_read_before_split – number of items to read before splitting.

  • split_fraction – fraction to split at.