public class SourceTestUtils
extends java.lang.Object
Source
implementations.
Contains a few lightweight utilities (e.g. reading items from a source or a reader,
such as readFromSource(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions)
and readFromUnstartedReader(org.apache.beam.sdk.io.Source.Reader<T>)
), as well as
heavyweight property testing and stress testing harnesses that help getting a large
amount of test coverage with few code. Most notable ones are:
assertSourcesEqualReferenceSource(org.apache.beam.sdk.io.BoundedSource<T>, java.util.List<? extends org.apache.beam.sdk.io.BoundedSource<T>>, org.apache.beam.sdk.options.PipelineOptions)
helps testing that the data read
by the union of sources produced by BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)
is the same as data read by the original source.
assertSplitAtFraction
family of functions - they test behavior of
BoundedSource.BoundedReader#splitAtFraction
, in particular, that
various consistency properties are respected and the total set of data read
by the source is preserved when splits happen.
Use assertSplitAtFractionBehavior(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome, org.apache.beam.sdk.options.PipelineOptions)
to test individual cases
of splitAtFraction
and use assertSplitAtFractionExhaustive(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions)
as a heavy-weight stress test including concurrency. We strongly recommend to
use both.
AvroSource
or
TextSource
.
Like PAssert
, requires JUnit and Hamcrest to be present in the classpath.
Modifier and Type | Class and Description |
---|---|
static class |
SourceTestUtils.ExpectedSplitOutcome
Expected outcome of
BoundedSource.BoundedReader.splitAtFraction(double) . |
Constructor and Description |
---|
SourceTestUtils() |
Modifier and Type | Method and Description |
---|---|
static <T> void |
assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource,
java.util.List<? extends BoundedSource<T>> sources,
PipelineOptions options)
Given a reference
Source and a list of Source s, assert that the union of
the records read from the list of sources is equal to the records read from the reference
source. |
static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult |
assertSplitAtFractionBehavior(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
SourceTestUtils.ExpectedSplitOutcome expectedOutcome,
PipelineOptions options)
Asserts that the
source 's reader either fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit items, or succeeds in a way that is
consistent according to assertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions) . |
static <T> void |
assertSplitAtFractionExhaustive(BoundedSource<T> source,
PipelineOptions options)
Asserts that for each possible start position,
BoundedSource.BoundedReader#splitAtFraction at every interesting fraction (halfway
between two fractions that differ by at least one item) can be called successfully and the
results are consistent if a split succeeds. |
static <T> void |
assertSplitAtFractionFails(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
PipelineOptions options)
Asserts that the
source 's reader fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit items. |
static <T> void |
assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source,
int numItemsToReadBeforeSplit,
double splitFraction,
PipelineOptions options)
Verifies some consistency properties of
BoundedSource.BoundedReader#splitAtFraction on the given source. |
static <T> void |
assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader,
PipelineOptions options)
Assert that a
Reader returns a Source that, when read from, produces the same
records as the reader. |
static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> |
createStructuralValues(Coder<T> coder,
java.util.List<T> list)
Testing utilities below depend on standard assertions and matchers to compare elements read by
sources.
|
static <T> java.util.List<T> |
readFromSource(BoundedSource<T> source,
PipelineOptions options)
Reads all elements from the given
BoundedSource . |
static <T> java.util.List<T> |
readFromSplitsOfSource(BoundedSource<T> source,
long desiredBundleSizeBytes,
PipelineOptions options) |
static <T> java.util.List<T> |
readFromStartedReader(Source.Reader<T> reader)
Reads all elements from the given started
Source.Reader . |
static <T> java.util.List<T> |
readFromUnstartedReader(Source.Reader<T> reader)
Reads all elements from the given unstarted
Source.Reader . |
static <T> java.util.List<T> |
readNItemsFromStartedReader(Source.Reader<T> reader,
int n)
Read elements from a
Source.Reader that has already had Source.Reader#start
called on it, until n elements are read. |
static <T> java.util.List<T> |
readNItemsFromUnstartedReader(Source.Reader<T> reader,
int n)
Read elements from a
Source.Reader until n elements are read. |
static <T> java.util.List<T> |
readRemainingFromReader(Source.Reader<T> reader,
boolean started)
Read all remaining elements from a
Source.Reader . |
static <T> BoundedSource<T> |
toUnsplittableSource(BoundedSource<T> boundedSource)
Returns an equivalent unsplittable
BoundedSource<T> . |
public static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, java.util.List<T> list) throws java.lang.Exception
equals
/hashCode
properly,
however every source has a Coder
and every Coder
can
produce a Coder.structuralValue(T)
whose equals
/hashCode
is
consistent with equality of encoded format.
So we use this Coder.structuralValue(T)
to compare elements read by sources.java.lang.Exception
public static <T> java.util.List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) throws java.io.IOException
BoundedSource
.java.io.IOException
public static <T> java.util.List<T> readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) throws java.lang.Exception
java.lang.Exception
public static <T> java.util.List<T> readFromUnstartedReader(Source.Reader<T> reader) throws java.io.IOException
Source.Reader
.java.io.IOException
public static <T> java.util.List<T> readFromStartedReader(Source.Reader<T> reader) throws java.io.IOException
Source.Reader
.java.io.IOException
public static <T> java.util.List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Source.Reader
until n elements are read.java.io.IOException
public static <T> java.util.List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Source.Reader
that has already had Source.Reader#start
called on it, until n elements are read.java.io.IOException
public static <T> java.util.List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) throws java.io.IOException
Source.Reader
.java.io.IOException
public static <T> void assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, java.util.List<? extends BoundedSource<T>> sources, PipelineOptions options) throws java.lang.Exception
Source
and a list of Source
s, assert that the union of
the records read from the list of sources is equal to the records read from the reference
source.java.lang.Exception
public static <T> void assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws java.lang.Exception
Reader
returns a Source
that, when read from, produces the same
records as the reader.java.lang.Exception
public static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws java.lang.Exception
source
's reader either fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit
items, or succeeds in a way that is
consistent according to assertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions)
.
Returns SplitAtFractionResult.
java.lang.Exception
public static <T> void assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
BoundedSource.BoundedReader#splitAtFraction
on the given source. Equivalent to
the following pseudocode:
Reader reader = source.createReader(); read N items from reader; Source residual = reader.splitAtFraction(splitFraction); Source primary = reader.getCurrentSource(); assert: items in primary == items we read so far + items we'll get by continuing to read from reader; assert: items in original source == items in primary + items in residual
java.lang.Exception
public static <T> void assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
source
's reader fails to splitAtFraction(fraction)
after reading numItemsToReadBeforeSplit
items.java.lang.Exception
public static <T> void assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) throws java.lang.Exception
BoundedSource.BoundedReader#splitAtFraction
at every interesting fraction (halfway
between two fractions that differ by at least one item) can be called successfully and the
results are consistent if a split succeeds. Verifies multithreaded splitting as well.java.lang.Exception
public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource)
BoundedSource<T>
.
It forwards most methods to the given boundedSource
, except:
BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)
rejects initial splitting
by returning itself in a list.
BoundedSource.BoundedReader.splitAtFraction(double)
rejects dynamic splitting by returning null.