apache_beam.testing.synthetic_pipeline module¶
A set of utilities to write pipelines for performance tests.
This module offers a way to create pipelines using synthetic sources and steps. Exact shape of the pipeline and the behaviour of sources and steps can be controlled through arguments. Please see function ‘parse_args()’ for more details about the arguments.
Shape of the pipeline is primariy controlled through two arguments. Argument ‘steps’ can be used to define a list of steps as a JSON string. Argument ‘barrier’ describes how these steps are separated from each other. Argument ‘barrier’ can be use to build a pipeline as a series of steps or a tree of steps with a fanin or a fanout of size 2.
Other arguments describe what gets generated by synthetic sources that produce data for the pipeline.
- 
apache_beam.testing.synthetic_pipeline.Generator¶
- alias of - apache_beam.testing.synthetic_pipeline._Random
- 
apache_beam.testing.synthetic_pipeline.rotate_key(element)[source]¶
- Returns a new key-value pair of the same size but with a different key. 
- 
apache_beam.testing.synthetic_pipeline.initial_splitting_zipf(start_position, stop_position, desired_num_bundles, distribution_parameter, num_total_records=None)[source]¶
- Split the given range (defined by start_position, stop_position) into desired_num_bundles using zipf with the given distribution_parameter. 
- 
class apache_beam.testing.synthetic_pipeline.SyntheticStep(per_element_delay_sec=0, per_bundle_delay_sec=0, output_records_per_input_record=1, output_filter_ratio=0)[source]¶
- Bases: - apache_beam.transforms.core.DoFn- A DoFn of which behavior can be controlled through prespecified parameters. 
- 
class apache_beam.testing.synthetic_pipeline.NonLiquidShardingOffsetRangeTracker(offset_range)[source]¶
- Bases: - apache_beam.io.restriction_trackers.OffsetRestrictionTracker- An OffsetRangeTracker that doesn’t allow splitting. 
- 
class apache_beam.testing.synthetic_pipeline.SyntheticSDFStepRestrictionProvider(num_records, initial_splitting_num_bundles, initial_splitting_uneven_chunks, disable_liquid_sharding, size_estimate_override)[source]¶
- Bases: - apache_beam.transforms.core.RestrictionProvider- A RestrictionProvider for SyntheticSDFStep. - An initial_restriction and split that operate on num_records and ignores source description (element). Splits into initial_splitting_num_bundles. Returns size_estimate_override as restriction size, if set. Otherwise uses element size. - If initial_splitting_uneven_chunks, produces uneven chunks. 
- 
apache_beam.testing.synthetic_pipeline.get_synthetic_sdf_step(per_element_delay_sec=0, per_bundle_delay_sec=0, output_records_per_input_record=1, output_filter_ratio=0, initial_splitting_num_bundles=8, initial_splitting_uneven_chunks=False, disable_liquid_sharding=False, size_estimate_override=None)[source]¶
- A function which returns a SyntheticSDFStep with given parameters. 
- 
class apache_beam.testing.synthetic_pipeline.SyntheticSource(input_spec)[source]¶
- Bases: - apache_beam.io.iobase.BoundedSource- A custom source of a specified size. - Initiates a synthetic source. - Parameters: - input_spec – Input specification of the source. See corresponding option in function ‘parse_args()’ below for more details. - Raises: - ValueError– if input parameters are invalid.- 
element_size¶
 
- 
- 
class apache_beam.testing.synthetic_pipeline.SyntheticSDFSourceRestrictionProvider[source]¶
- Bases: - apache_beam.transforms.core.RestrictionProvider- A RestrictionProvider for SyntheticSDFAsSource. - In initial_restriction(element) and split(element), element means source description. A typical element is like: - {
- ‘key_size’: 1, ‘value_size’: 1, ‘initial_splitting_num_bundles’: 8, ‘initial_splitting_desired_bundle_size’: 2, ‘sleep_per_input_record_sec’: 0, ‘initial_splitting’ : ‘const’
 - } 
- 
class apache_beam.testing.synthetic_pipeline.SyntheticSDFAsSource(*unused_args, **unused_kwargs)[source]¶
- Bases: - apache_beam.transforms.core.DoFn- A SDF that generates records like a source. - This SDF accepts a PCollection of record-based source description. A typical description is like: - {
- ‘key_size’: 1, ‘value_size’: 1, ‘initial_splitting_num_bundles’: 8, ‘initial_splitting_desired_bundle_size’: 2, ‘sleep_per_input_record_sec’: 0, ‘initial_splitting’ : ‘const’
 - } - A simple pipeline taking this SDF as a source is like:
- p | beam.Create([description1, description2,…]) | beam.ParDo(SyntheticSDFAsSource())
 - Note - The SDF.process() will have different param content between defining a DoFn and runtime. When defining an SDF.process, the restriction_tracker should be a RestrictionProvider. During runtime, the DoFnRunner.process_with_sized_restriction() will feed a ‘RestrictionTracker’ based on a restriction to SDF.process(). 
- 
apache_beam.testing.synthetic_pipeline.merge_using_gbk(name, pc1, pc2)[source]¶
- Merges two given PCollections using a CoGroupByKey. 
- 
apache_beam.testing.synthetic_pipeline.merge_using_side_input(name, pc1, pc2)[source]¶
- Merges two given PCollections using side inputs. 
- 
apache_beam.testing.synthetic_pipeline.expand_using_gbk(name, pc)[source]¶
- Expands a given PCollection into two copies using GroupByKey. 
- 
apache_beam.testing.synthetic_pipeline.expand_using_second_output(name, pc)[source]¶
- Expands a given PCollection into two copies using side outputs. 
- 
apache_beam.testing.synthetic_pipeline.parse_args(args)[source]¶
- Parses a given set of arguments. - Parameters: - args – set of arguments to be passed. - Returns: - a tuple where first item gives the set of arguments defined and parsed within this method and second item gives the set of unknown arguments. 
- 
apache_beam.testing.synthetic_pipeline.run(argv=None, save_main_session=True)[source]¶
- Runs the workflow. 
- 
class apache_beam.testing.synthetic_pipeline.StatefulLoadGenerator(input_options, num_keys=100)[source]¶
- Bases: - apache_beam.transforms.ptransform.PTransform- A PTransform for generating random data using Timers API. - 
class GenerateLoad(num_records_per_key, value_size, bundle_size=1000)[source]¶
- Bases: - apache_beam.transforms.core.DoFn- 
state_spec= CombiningValueStateSpec(bundles_remaining)¶
 - 
timer_spec= TimerSpec(ts-timer)¶
 
- 
 
- 
class