apache_beam.testing.test_pipeline module

Test Pipeline, a wrapper of Pipeline for test purpose

class apache_beam.testing.test_pipeline.TestPipeline(runner=None, options=None, argv=None, is_integration_test=False, blocking=True, additional_pipeline_args=None)[source]

Bases: apache_beam.pipeline.Pipeline

TestPipeline class is used inside of Beam tests that can be configured to run against pipeline runner.

It has a functionality to parse arguments from command line and build pipeline options for tests who runs against a pipeline runner and utilizes resources of the pipeline runner. Those test functions are recommended to be tagged by @attr("ValidatesRunner") annotation.

In order to configure the test with customized pipeline options from command line, system argument --test-pipeline-options can be used to obtains a list of pipeline options. If no options specified, default value will be used.

For example, use following command line to execute all ValidatesRunner tests:

python setup.py nosetests -a ValidatesRunner \
    --test-pipeline-options="--runner=DirectRunner \
                             --job_name=myJobName \
                             --num_workers=1"

For example, use assert_that for test validation:

with TestPipeline() as pipeline:
  pcoll = ...
  assert_that(pcoll, equal_to(...))

Initialize a pipeline object for test.

Parameters:
  • runner (PipelineRunner) – An object of type PipelineRunner that will be used to execute the pipeline. For registered runners, the runner name can be specified, otherwise a runner object must be supplied.
  • options (PipelineOptions) – A configured PipelineOptions object containing arguments that should be used for running the pipeline job.
  • argv (List[str]) – A list of arguments (such as sys.argv) to be used for building a PipelineOptions object. This will only be used if argument options is None.
  • is_integration_test (bool) – True if the test is an integration test, False otherwise.
  • blocking (bool) – Run method will wait until pipeline execution is completed.
  • additional_pipeline_args (List[str]) – additional pipeline arguments to be included when construction the pipeline options object.
Raises:

ValueError – if either the runner or options argument is not of the expected type.

run(test_runner_api=True)[source]
get_pipeline_options()[source]
get_full_options_as_args(**extra_opts)[source]

Get full pipeline options as an argument list.

Append extra pipeline options to existing option list if provided. Test verifier (if contains in extra options) should be pickled before appending, and will be unpickled later in the TestRunner.

get_option(opt_name)[source]

Get a pipeline option value by name

Parameters:opt_name – The name of the pipeline option.
Returns:None if option is not found in existing option list which is generated by parsing value of argument test-pipeline-options.