Class TestPipeline

java.lang.Object
org.apache.beam.sdk.Pipeline
org.apache.beam.sdk.testing.TestPipeline
All Implemented Interfaces:
TestRule

public class TestPipeline extends Pipeline implements TestRule
A creator of test pipelines that can be used inside of tests that can be configured to run locally or against a remote pipeline runner.

It is recommended to tag hand-selected tests for this purpose using the ValidatesRunner Category annotation, as each test run against a pipeline runner will utilize resources of that pipeline runner.

In order to run tests on a pipeline runner, the following conditions must be met:

  • System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline options. For example:
    [
         "--runner=TestDataflowRunner",
         "--project=mygcpproject",
         "--stagingLocation=gs://mygcsbucket/path"
     ]
    Note that the set of pipeline options required is pipeline runner specific.
  • Jars containing the SDK and test classes must be available on the classpath.

Use PAssert for tests, as it integrates with this test harness in both direct and remote execution modes. For example:


 @Rule
  public final transient TestPipeline p = TestPipeline.create();

 @Test
 @Category(NeedsRunner.class)
  public void myPipelineTest() throws Exception {
    final PCollection<String> pCollection = pipeline.apply(...)
    PAssert.that(pCollection).containsInAnyOrder(...);
    pipeline.run();
  }
 

For pipeline runners, it is required that they must throw an AssertionError containing the message from the PAssert that failed.

See also the Testing documentation section.

  • Field Details

  • Method Details

    • create

      public static TestPipeline create()
      Creates and returns a new test pipeline.

      Use PAssert to add tests, then call Pipeline.run() to execute the pipeline and check the tests.

    • fromOptions

      public static TestPipeline fromOptions(PipelineOptions options)
    • getOptions

      public PipelineOptions getOptions()
      Overrides:
      getOptions in class Pipeline
    • apply

      public Statement apply(Statement statement, Description description)
      Specified by:
      apply in interface TestRule
    • run

      public PipelineResult run()
      Runs this TestPipeline, unwrapping any AssertionError that is raised during testing.
      Overrides:
      run in class Pipeline
    • runWithAdditionalOptionArgs

      public PipelineResult runWithAdditionalOptionArgs(List<String> additionalArgs)
      Runs this TestPipeline with additional cmd pipeline option args.

      This is useful when using PipelineOptions.as(Class) directly introduces circular dependency.

      Most of logic is similar to testingPipelineOptions().

    • run

      public PipelineResult run(PipelineOptions options)
      Like run() but with the given potentially modified options.
      Overrides:
      run in class Pipeline
    • newProvider

      public <T> ValueProvider<T> newProvider(T runtimeValue)
      Returns a new ValueProvider that is inaccessible before run(), but will be accessible while the pipeline runs.
    • enableAbandonedNodeEnforcement

      public TestPipeline enableAbandonedNodeEnforcement(boolean enable)
      Enables the abandoned node detection. Abandoned nodes are PTransforms, PAsserts included, that were not executed by the pipeline runner. Abandoned nodes are most likely to occur due to the one of the following scenarios:
      • Lack of a pipeline.run() statement at the end of a test.
      • Addition of PTransforms after the pipeline has already run.
      Abandoned node detection is automatically enabled when a real pipeline runner (i.e. not a CrashingRunner) and/or a NeedsRunner or a ValidatesRunner annotation are detected.
    • enableAutoRunIfMissing

      public TestPipeline enableAutoRunIfMissing(boolean enable)
      If enabled, a pipeline.run() statement will be added automatically in case it is missing in the test.
    • toString

      public String toString()
      Overrides:
      toString in class Pipeline
    • testingPipelineOptions

      public static PipelineOptions testingPipelineOptions()
      Creates PipelineOptions for testing.
    • verifyPAssertsSucceeded

      public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult)
      Verifies all {PAsserts} in the pipeline have been executed and were successful.

      Note this only runs for runners which support Metrics. Runners which do not should verify this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001