Package org.apache.beam.sdk.testing
Class SourceTestUtils
java.lang.Object
org.apache.beam.sdk.testing.SourceTestUtils
Helper functions and test harnesses for checking correctness of
Source implementations.
Contains a few lightweight utilities (e.g. reading items from a source or a reader, such as
readFromSource(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions) and readFromUnstartedReader(org.apache.beam.sdk.io.Source.Reader<T>)), as well as heavyweight property
testing and stress testing harnesses that help getting a large amount of test coverage with few
code. Most notable ones are:
assertSourcesEqualReferenceSource(org.apache.beam.sdk.io.BoundedSource<T>, java.util.List<? extends org.apache.beam.sdk.io.BoundedSource<T>>, org.apache.beam.sdk.options.PipelineOptions)helps testing that the data read by the union of sources produced byBoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)is the same as data read by the original source.- If your source implements dynamic work rebalancing, use the
assertSplitAtFractionfamily of functions - they test behavior ofBoundedSource.BoundedReader.splitAtFraction(double), in particular, that various consistency properties are respected and the total set of data read by the source is preserved when splits happen. UseassertSplitAtFractionBehavior(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome, org.apache.beam.sdk.options.PipelineOptions)to test individual cases ofsplitAtFractionand useassertSplitAtFractionExhaustive(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions)as a heavy-weight stress test including concurrency. We strongly recommend to use both.
AvroSource or
TextSource.
Like PAssert, requires JUnit and Hamcrest to be present in the classpath.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumExpected outcome ofBoundedSource.BoundedReader.splitAtFraction(double). -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> voidassertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, List<? extends BoundedSource<T>> sources, PipelineOptions options) Given a referenceSourceand a list ofSources, assert that the union of the records read from the list of sources is equal to the records read from the reference source.static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResultassertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options) Asserts that thesource's reader either fails tosplitAtFraction(fraction)after readingnumItemsToReadBeforeSplititems, or succeeds in a way that is consistent according toassertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions).static <T> voidassertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) Asserts that for each possible start position,BoundedSource.BoundedReader.splitAtFraction(double)at every interesting fraction (halfway between two fractions that differ by at least one item) can be called successfully and the results are consistent if a split succeeds.static <T> voidassertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) Asserts that thesource's reader fails tosplitAtFraction(fraction)after readingnumItemsToReadBeforeSplititems.static <T> voidassertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) Verifies some consistency properties ofBoundedSource.BoundedReader.splitAtFraction(double)on the given source.static <T> voidassertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) Assert that aReaderreturns aSourcethat, when read from, produces the same records as the reader.static <T> List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, List<T> list) Testing utilities below depend on standard assertions and matchers to compare elements read by sources.static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) Reads all elements from the givenBoundedSource.static <T> List<T> readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) static <T> List<T> readFromStartedReader(Source.Reader<T> reader) Reads all elements from the given startedSource.Reader.static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) Reads all elements from the given unstartedSource.Reader.static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) Read elements from aSource.Readerthat has already hadSource.Reader.start()called on it, until n elements are read.static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) Read elements from aSource.Readeruntil n elements are read.static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) Read all remaining elements from aSource.Reader.static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) Returns an equivalent unsplittableBoundedSource<T>.
-
Constructor Details
-
SourceTestUtils
public SourceTestUtils()
-
-
Method Details
-
createStructuralValues
public static <T> List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, List<T> list) throws Exception Testing utilities below depend on standard assertions and matchers to compare elements read by sources. In general the elements may not implementequals/hashCodeproperly, however every source has aCoderand everyCodercan produce aCoder.structuralValue(T)whoseequals/hashCodeis consistent with equality of encoded format. So we use thisCoder.structuralValue(T)to compare elements read by sources.- Throws:
Exception
-
readFromSource
public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) throws IOException Reads all elements from the givenBoundedSource.- Throws:
IOException
-
readFromSplitsOfSource
public static <T> List<T> readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) throws Exception - Throws:
Exception
-
readFromUnstartedReader
Reads all elements from the given unstartedSource.Reader.- Throws:
IOException
-
readFromStartedReader
Reads all elements from the given startedSource.Reader.- Throws:
IOException
-
readNItemsFromUnstartedReader
public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) throws IOException Read elements from aSource.Readeruntil n elements are read.- Throws:
IOException
-
readNItemsFromStartedReader
public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) throws IOException Read elements from aSource.Readerthat has already hadSource.Reader.start()called on it, until n elements are read.- Throws:
IOException
-
readRemainingFromReader
public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) throws IOException Read all remaining elements from aSource.Reader.- Throws:
IOException
-
assertSourcesEqualReferenceSource
public static <T> void assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, List<? extends BoundedSource<T>> sources, PipelineOptions options) throws Exception Given a referenceSourceand a list ofSources, assert that the union of the records read from the list of sources is equal to the records read from the reference source.- Throws:
Exception
-
assertUnstartedReaderReadsSameAsItsSource
public static <T> void assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception Assert that aReaderreturns aSourcethat, when read from, produces the same records as the reader.- Throws:
Exception
-
assertSplitAtFractionBehavior
public static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws Exception Asserts that thesource's reader either fails tosplitAtFraction(fraction)after readingnumItemsToReadBeforeSplititems, or succeeds in a way that is consistent according toassertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions).Returns SplitAtFractionResult.
- Throws:
Exception
-
assertSplitAtFractionSucceedsAndConsistent
public static <T> void assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws Exception Verifies some consistency properties ofBoundedSource.BoundedReader.splitAtFraction(double)on the given source. Equivalent to the following pseudocode:Reader reader = source.createReader(); read N items from reader; Source residual = reader.splitAtFraction(splitFraction); Source primary = reader.getCurrentSource(); assert: items in primary == items we read so far + items we'll get by continuing to read from reader; assert: items in original source == items in primary + items in residual- Throws:
Exception
-
assertSplitAtFractionFails
public static <T> void assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws Exception Asserts that thesource's reader fails tosplitAtFraction(fraction)after readingnumItemsToReadBeforeSplititems.- Throws:
Exception
-
assertSplitAtFractionExhaustive
public static <T> void assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) throws Exception Asserts that for each possible start position,BoundedSource.BoundedReader.splitAtFraction(double)at every interesting fraction (halfway between two fractions that differ by at least one item) can be called successfully and the results are consistent if a split succeeds. Verifies multithreaded splitting as well.- Throws:
Exception
-
toUnsplittableSource
Returns an equivalent unsplittableBoundedSource<T>.It forwards most methods to the given
boundedSource, except:BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)rejects initial splitting by returning itself in a list.BoundedSource.BoundedReader.splitAtFraction(double)rejects dynamic splitting by returning null.
-