public class SourceTestUtils
extends java.lang.Object
Source implementations.
Contains a few lightweight utilities (e.g. reading items from a source or a reader, such as
readFromSource(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions) and readFromUnstartedReader(org.apache.beam.sdk.io.Source.Reader<T>)), as well as heavyweight property
testing and stress testing harnesses that help getting a large amount of test coverage with few
code. Most notable ones are:
assertSourcesEqualReferenceSource(org.apache.beam.sdk.io.BoundedSource<T>, java.util.List<? extends org.apache.beam.sdk.io.BoundedSource<T>>, org.apache.beam.sdk.options.PipelineOptions) helps testing that the data read by the union of
sources produced by BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions) is the same as data read by the original
source.
assertSplitAtFraction
family of functions - they test behavior of BoundedSource.BoundedReader#splitAtFraction, in particular, that various consistency
properties are respected and the total set of data read by the source is preserved when
splits happen. Use assertSplitAtFractionBehavior(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome, org.apache.beam.sdk.options.PipelineOptions) to test individual cases of
splitAtFraction and use assertSplitAtFractionExhaustive(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions) as a heavy-weight
stress test including concurrency. We strongly recommend to use both.
AvroSource or TextSource.
Like PAssert, requires JUnit and Hamcrest to be present in the classpath.
| Modifier and Type | Class and Description |
|---|---|
static class |
SourceTestUtils.ExpectedSplitOutcome
Expected outcome of
BoundedSource.BoundedReader.splitAtFraction(double). |
| Constructor and Description |
|---|
SourceTestUtils() |
| Modifier and Type | Method and Description |
|---|---|
static <T> void |
assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource,
java.util.List<? extends BoundedSource<T>> sources,
PipelineOptions options)
Given a reference
Source and a list of Sources, assert that the union of the
records read from the list of sources is equal to the records read from the reference source. |
static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult |
assertSplitAtFractionBehavior(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
SourceTestUtils.ExpectedSplitOutcome expectedOutcome,
PipelineOptions options)
Asserts that the
source's reader either fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit items, or succeeds in a way that is consistent
according to assertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions). |
static <T> void |
assertSplitAtFractionExhaustive(BoundedSource<T> source,
PipelineOptions options)
Asserts that for each possible start position,
BoundedSource.BoundedReader#splitAtFraction at every interesting fraction (halfway between two
fractions that differ by at least one item) can be called successfully and the results are
consistent if a split succeeds. |
static <T> void |
assertSplitAtFractionFails(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
PipelineOptions options)
Asserts that the
source's reader fails to splitAtFraction(fraction) after
reading numItemsToReadBeforeSplit items. |
static <T> void |
assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
PipelineOptions options)
Verifies some consistency properties of
BoundedSource.BoundedReader#splitAtFraction on
the given source. |
static <T> void |
assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader,
PipelineOptions options)
Assert that a
Reader returns a Source that, when read from, produces the same
records as the reader. |
static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> |
createStructuralValues(Coder<T> coder,
java.util.List<T> list)
Testing utilities below depend on standard assertions and matchers to compare elements read by
sources.
|
static <T> java.util.List<T> |
readFromSource(BoundedSource<T> source,
PipelineOptions options)
Reads all elements from the given
BoundedSource. |
static <T> java.util.List<T> |
readFromSplitsOfSource(BoundedSource<T> source,
long desiredBundleSizeBytes,
PipelineOptions options) |
static <T> java.util.List<T> |
readFromStartedReader(Source.Reader<T> reader)
Reads all elements from the given started
Source.Reader. |
static <T> java.util.List<T> |
readFromUnstartedReader(Source.Reader<T> reader)
Reads all elements from the given unstarted
Source.Reader. |
static <T> java.util.List<T> |
readNItemsFromStartedReader(Source.Reader<T> reader,
int n)
Read elements from a
Source.Reader that has already had Source.Reader#start
called on it, until n elements are read. |
static <T> java.util.List<T> |
readNItemsFromUnstartedReader(Source.Reader<T> reader,
int n)
Read elements from a
Source.Reader until n elements are read. |
static <T> java.util.List<T> |
readRemainingFromReader(Source.Reader<T> reader,
boolean started)
Read all remaining elements from a
Source.Reader. |
static <T> BoundedSource<T> |
toUnsplittableSource(BoundedSource<T> boundedSource)
Returns an equivalent unsplittable
BoundedSource<T>. |
public static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, java.util.List<T> list) throws java.lang.Exception
equals/hashCode properly,
however every source has a Coder and every Coder can produce a Coder.structuralValue(T) whose equals/hashCode is consistent with equality of
encoded format. So we use this Coder.structuralValue(T) to compare elements read by
sources.java.lang.Exceptionpublic static <T> java.util.List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) throws java.io.IOException
BoundedSource.java.io.IOExceptionpublic static <T> java.util.List<T> readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) throws java.lang.Exception
java.lang.Exceptionpublic static <T> java.util.List<T> readFromUnstartedReader(Source.Reader<T> reader) throws java.io.IOException
Source.Reader.java.io.IOExceptionpublic static <T> java.util.List<T> readFromStartedReader(Source.Reader<T> reader) throws java.io.IOException
Source.Reader.java.io.IOExceptionpublic static <T> java.util.List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Source.Reader until n elements are read.java.io.IOExceptionpublic static <T> java.util.List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Source.Reader that has already had Source.Reader#start
called on it, until n elements are read.java.io.IOExceptionpublic static <T> java.util.List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) throws java.io.IOException
Source.Reader.java.io.IOExceptionpublic static <T> void assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, java.util.List<? extends BoundedSource<T>> sources, PipelineOptions options) throws java.lang.Exception
Source and a list of Sources, assert that the union of the
records read from the list of sources is equal to the records read from the reference source.java.lang.Exceptionpublic static <T> void assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws java.lang.Exception
Reader returns a Source that, when read from, produces the same
records as the reader.java.lang.Exceptionpublic static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws java.lang.Exception
source's reader either fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit items, or succeeds in a way that is consistent
according to assertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions).
Returns SplitAtFractionResult.
java.lang.Exceptionpublic static <T> void assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
BoundedSource.BoundedReader#splitAtFraction on
the given source. Equivalent to the following pseudocode:
Reader reader = source.createReader();
read N items from reader;
Source residual = reader.splitAtFraction(splitFraction);
Source primary = reader.getCurrentSource();
assert: items in primary == items we read so far
+ items we'll get by continuing to read from reader;
assert: items in original source == items in primary + items in residual
java.lang.Exceptionpublic static <T> void assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
source's reader fails to splitAtFraction(fraction) after
reading numItemsToReadBeforeSplit items.java.lang.Exceptionpublic static <T> void assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) throws java.lang.Exception
BoundedSource.BoundedReader#splitAtFraction at every interesting fraction (halfway between two
fractions that differ by at least one item) can be called successfully and the results are
consistent if a split succeeds. Verifies multithreaded splitting as well.java.lang.Exceptionpublic static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource)
BoundedSource<T>.
It forwards most methods to the given boundedSource, except:
BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions) rejects initial splitting by returning itself in a list.
BoundedSource.BoundedReader.splitAtFraction(double) rejects dynamic splitting by returning null.