public class TestPipeline extends Pipeline implements TestRule
It is recommended to tag hand-selected tests for this purpose using the ValidatesRunner
Category
annotation, as each test run against a pipeline runner will
utilize resources of that pipeline runner.
In order to run tests on a pipeline runner, the following conditions must be met:
[
"--runner=TestDataflowRunner",
"--project=mygcpproject",
"--stagingLocation=gs://mygcsbucket/path"
]
Note that the set of pipeline options required is pipeline runner specific.
Use PAssert
for tests, as it integrates with this test harness in both direct and
remote execution modes. For example:
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
final PCollection<String> pCollection = pipeline.apply(...)
PAssert.that(pCollection).containsInAnyOrder(...);
pipeline.run();
}
For pipeline runners, it is required that they must throw an AssertionError
containing
the message from the PAssert
that failed.
See also the Testing documentation section.
Modifier and Type | Class and Description |
---|---|
static class |
TestPipeline.AbandonedNodeException
An exception thrown in case an abandoned
PTransform is
detected, that is, a PTransform that has not been run. |
static class |
TestPipeline.PipelineRunMissingException
An exception thrown in case a test finishes without invoking
Pipeline.run() . |
static interface |
TestPipeline.TestValueProviderOptions
Implementation detail of
newProvider(T) , do not use. |
Pipeline.PipelineExecutionException, Pipeline.PipelineVisitor
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
System property used to set
TestPipelineOptions . |
Modifier and Type | Method and Description |
---|---|
Statement |
apply(Statement statement,
Description description) |
static TestPipeline |
create()
Creates and returns a new test pipeline.
|
TestPipeline |
enableAbandonedNodeEnforcement(boolean enable)
Enables the abandoned node detection.
|
TestPipeline |
enableAutoRunIfMissing(boolean enable)
If enabled, a
pipeline.run() statement will be added automatically in case it is
missing in the test. |
static TestPipeline |
fromOptions(PipelineOptions options) |
PipelineOptions |
getOptions() |
<T> ValueProvider<T> |
newProvider(T runtimeValue)
Returns a new
ValueProvider that is inaccessible before run() , but will be
accessible while the pipeline runs. |
PipelineResult |
run()
Runs this
TestPipeline , unwrapping any AssertionError that is raised during
testing. |
PipelineResult |
run(PipelineOptions options)
Like
run() but with the given potentially modified options. |
PipelineResult |
runWithAdditionalOptionArgs(java.util.List<java.lang.String> additionalArgs)
Runs this
TestPipeline with additional cmd pipeline option args. |
static PipelineOptions |
testingPipelineOptions()
Creates
PipelineOptions for testing. |
java.lang.String |
toString() |
static void |
verifyPAssertsSucceeded(Pipeline pipeline,
PipelineResult pipelineResult)
Verifies all {
PAsserts } in the pipeline have been executed and were successful. |
apply, apply, applyTransform, applyTransform, begin, create, forTransformHierarchy, getCoderRegistry, getSchemaRegistry, registerBadRecordErrorHandler, replaceAll, setCoderRegistry, traverseTopologically
public static final java.lang.String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
TestPipelineOptions
.public static TestPipeline create()
Use PAssert
to add tests, then call Pipeline.run()
to execute the pipeline and
check the tests.
public static TestPipeline fromOptions(PipelineOptions options)
public PipelineOptions getOptions()
getOptions
in class Pipeline
public Statement apply(Statement statement, Description description)
public PipelineResult run()
TestPipeline
, unwrapping any AssertionError
that is raised during
testing.public PipelineResult runWithAdditionalOptionArgs(java.util.List<java.lang.String> additionalArgs)
TestPipeline
with additional cmd pipeline option args.
This is useful when using PipelineOptions.as(Class)
directly introduces circular
dependency.
Most of logic is similar to testingPipelineOptions()
.
public PipelineResult run(PipelineOptions options)
run()
but with the given potentially modified options.public <T> ValueProvider<T> newProvider(T runtimeValue)
ValueProvider
that is inaccessible before run()
, but will be
accessible while the pipeline runs.public TestPipeline enableAbandonedNodeEnforcement(boolean enable)
PTransforms
,
PAsserts
included, that were not executed by the pipeline runner. Abandoned nodes are
most likely to occur due to the one of the following scenarios:
pipeline.run()
statement at the end of a test.
CrashingRunner
) and/or a NeedsRunner
or a ValidatesRunner
annotation
are detected.public TestPipeline enableAutoRunIfMissing(boolean enable)
pipeline.run()
statement will be added automatically in case it is
missing in the test.public static PipelineOptions testingPipelineOptions()
PipelineOptions
for testing.public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult)
PAsserts
} in the pipeline have been executed and were successful.
Note this only runs for runners which support Metrics. Runners which do not should verify this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001