apache_beam.transforms.util module
Simple utility PTransforms.
- class apache_beam.transforms.util.BatchElements(min_batch_size=1, max_batch_size=10000, target_batch_overhead=0.05, target_batch_duration_secs=10, target_batch_duration_secs_including_fixed_cost=None, max_batch_duration_secs=None, *, element_size_fn=<function BatchElements.<lambda>>, variance=0.25, clock=<built-in function time>, record_metrics=True)[source]
Bases:
PTransform
A Transform that batches elements for amortized processing.
This transform is designed to precede operations whose processing cost is of the form
time = fixed_cost + num_elements * per_element_cost
where the per element cost is (often significantly) smaller than the fixed cost and could be amortized over multiple elements. It consumes a PCollection of element type T and produces a PCollection of element type List[T].
This transform attempts to find the best batch size between the minimim and maximum parameters by profiling the time taken by (fused) downstream operations. For a fixed batch size, set the min and max to be equal.
Elements are batched per-window and batches emitted in the window corresponding to its contents. Each batch is emitted with a timestamp at the end of their window.
When the max_batch_duration_secs arg is provided, a stateful implementation of BatchElements is used to batch elements across bundles. This is most impactful in streaming applications where many bundles only contain one element. Larger max_batch_duration_secs values might reduce the throughput of the transform, while smaller values might improve the throughput but make it more likely that batches are smaller than the target batch size.
As a general recommendation, start with low values (e.g. 0.005 aka 5ms) and increase as needed to get the desired tradeoff between target batch size and latency or throughput.
For more information on tuning parameters to this transform, see https://beam.apache.org/documentation/patterns/batch-elements
- Parameters:
min_batch_size – (optional) the smallest size of a batch
max_batch_size – (optional) the largest size of a batch
target_batch_overhead – (optional) a target for fixed_cost / time, as used in the formula above
target_batch_duration_secs – (optional) a target for total time per bundle, in seconds, excluding fixed cost
target_batch_duration_secs_including_fixed_cost – (optional) a target for total time per bundle, in seconds, including fixed cost
max_batch_duration_secs – (optional) the maximum amount of time to buffer a batch before emitting. Setting this argument to be non-none uses the stateful implementation of BatchElements.
element_size_fn – (optional) A mapping of an element to its contribution to batch size, defaulting to every element having size 1. When provided, attempts to provide batches of optimal total size which may consist of a varying number of elements.
variance – (optional) the permitted (relative) amount of deviation from the (estimated) ideal batch size used to produce a wider base for linear interpolation
clock – (optional) an alternative to time.time for measuring the cost of donwstream operations (mostly for testing)
record_metrics – (optional) whether or not to record beam metrics on distributions of the batch size. Defaults to True.
- class apache_beam.transforms.util.CoGroupByKey(*, pipeline=None)[source]
Bases:
PTransform
Groups results across several PCollections by key.
Given an input dict of serializable keys (called “tags”) to 0 or more PCollections of (key, value) tuples, it creates a single output PCollection of (key, value) tuples whose keys are the unique input keys from all inputs, and whose values are dicts mapping each tag to an iterable of whatever values were under the key in the corresponding PCollection, in this manner:
('some key', {'tag1': ['value 1 under "some key" in pcoll1', 'value 2 under "some key" in pcoll1', ...], 'tag2': ... , ... })
where [] refers to an iterable, not a list.
For example, given:
{'tag1': pc1, 'tag2': pc2, 333: pc3}
where:
pc1 = beam.Create([(k1, v1)])) pc2 = beam.Create([]) pc3 = beam.Create([(k1, v31), (k1, v32), (k2, v33)])
The output PCollection would consist of items:
[(k1, {'tag1': [v1], 'tag2': [], 333: [v31, v32]}), (k2, {'tag1': [], 'tag2': [], 333: [v33]})]
where [] refers to an iterable, not a list.
CoGroupByKey also works for tuples, lists, or other flat iterables of PCollections, in which case the values of the resulting PCollections will be tuples whose nth value is the iterable of values from the nth PCollection—conceptually, the “tags” are the indices into the input. Thus, for this input:
(pc1, pc2, pc3)
the output would be:
[(k1, ([v1], [], [v31, v32]), (k2, ([], [], [v33]))]
where, again, [] refers to an iterable, not a list.
- \*\*kwargs
Accepts a single named argument “pipeline”, which specifies the pipeline that “owns” this PTransform. Ordinarily CoGroupByKey can obtain this information from one of the input PCollections, but if there are none (or if there’s a chance there may be none), this argument is the only way to provide pipeline information, and should be considered mandatory.
- apache_beam.transforms.util.Distinct()[source]
Produces a PCollection containing distinct elements of a PCollection.
- apache_beam.transforms.util.Keys(label='Keys')[source]
Produces a PCollection of first elements of 2-tuples in a PCollection.
- apache_beam.transforms.util.KvSwap(label='KvSwap')[source]
Produces a PCollection reversing 2-tuples in a PCollection.
- class apache_beam.transforms.util.LogElements(label=None, prefix='', with_timestamp=False, with_window=False, level=None)[source]
Bases:
PTransform
PTransform for printing the elements of a PCollection.
- Parameters:
label (str) – (optional) A custom label for the transform.
prefix (str) – (optional) A prefix string to prepend to each logged element.
with_timestamp (bool) – (optional) Whether to include element’s timestamp.
with_window (bool) – (optional) Whether to include element’s window.
level – (optional) The logging level for the output (e.g. logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR). If not specified, the log is printed to stdout.
- class apache_beam.transforms.util.Regex[source]
Bases:
object
PTransform to use Regular Expression to process the elements in a PCollection.
- ALL = '__regex_all_groups'
- static matches(regex, group=0)[source]
Returns the matches (group 0 by default) if zero or more characters at the beginning of string match the regular expression. To match the entire string, add “$” sign at the end of regex expression.
Group can be integer value or a string value.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
group – (optional) name/number of the group, it can be integer or a string value. Defaults to 0, meaning the entire matched string will be returned.
- static all_matches(regex)[source]
Returns all matches (groups) if zero or more characters at the beginning of string match the regular expression.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
- static matches_kv(regex, keyGroup, valueGroup=0)[source]
Returns the KV pairs if the string matches the regular expression, deriving the key & value from the specified group of the regular expression.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
keyGroup – The Regex group to use as the key. Can be int or str.
valueGroup – (optional) Regex group to use the value. Can be int or str. The default value “0” returns entire matched string.
- static find(regex, group=0)[source]
Returns the matches if a portion of the line matches the Regex. Returns the entire group (group 0 by default). Group can be integer value or a string value.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
group – (optional) name of the group, it can be integer or a string value.
- static find_all(regex, group=0, outputEmpty=True)[source]
Returns the matches if a portion of the line matches the Regex. By default, list of group 0 will return with empty items. To get all groups, pass the Regex.ALL flag in the group parameter which returns all the groups in the tuple format.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
group – (optional) name of the group, it can be integer or a string value.
outputEmpty – (optional) Should empty be output. True to output empties and false if not.
- static find_kv(regex, keyGroup, valueGroup=0)[source]
Returns the matches if a portion of the line matches the Regex. Returns the specified groups as the key and value pair.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
keyGroup – The Regex group to use as the key. Can be int or str.
valueGroup – (optional) Regex group to use the value. Can be int or str. The default value “0” returns entire matched string.
- static replace_all(regex, replacement)[source]
Returns the matches if a portion of the line matches the regex and replaces all matches with the replacement string.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
replacement – the string to be substituted for each match.
- static replace_first(regex, replacement)[source]
Returns the matches if a portion of the line matches the regex and replaces the first match with the replacement string.
- Parameters:
regex – the regular expression string or (re.compile) pattern.
replacement – the string to be substituted for each match.
- static split(regex, outputEmpty=False)[source]
Returns the list string which was splitted on the basis of regular expression. It will not output empty items (by defaults).
- Parameters:
regex – the regular expression string or (re.compile) pattern.
outputEmpty – (optional) Should empty be output. True to output empties and false if not.
- class apache_beam.transforms.util.Reify[source]
Bases:
object
PTransforms for converting between explicit and implicit form of various Beam values.
- class Timestamp(label: str | None = None)[source]
Bases:
PTransform
PTransform to wrap a value in a TimestampedValue with it’s associated timestamp.
- class Window(label: str | None = None)[source]
Bases:
PTransform
PTransform to convert an element in a PCollection into a tuple of (element, timestamp, window), wrapped in a TimestampedValue with it’s associated timestamp.
- class TimestampInValue(label: str | None = None)[source]
Bases:
PTransform
PTransform to wrap the Value in a KV pair in a TimestampedValue with the element’s associated timestamp.
- class WindowInValue(label: str | None = None)[source]
Bases:
PTransform
PTransform to convert the Value in a KV pair into a tuple of (value, timestamp, window), with the whole element being wrapped inside a TimestampedValue.
- apache_beam.transforms.util.RemoveDuplicates()[source]
Produces a PCollection containing distinct elements of a PCollection.
- class apache_beam.transforms.util.Reshuffle(num_buckets=None)[source]
Bases:
PTransform
PTransform that returns a PCollection equivalent to its input, but operationally provides some of the side effects of a GroupByKey, in particular checkpointing, and preventing fusion of the surrounding transforms.
Reshuffle adds a temporary random key to each element, performs a ReshufflePerKey, and finally removes the temporary key.
- Parameters:
num_buckets – If set, specifies the maximum random keys that would be generated.
- expand(pcoll: PValue) PCollection [source]
- to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, None] [source]
- class apache_beam.transforms.util.ToString[source]
Bases:
object
PTransform for converting a PCollection element, KV or PCollection Iterable to string.
- static Iterables(delimiter=None)[source]
Transforms each item in the iterable of the input of PCollection to a string. There is no trailing delimiter.
- static Kvs(delimiter=None)
Transforms each item in the iterable of the input of PCollection to a string. There is no trailing delimiter.
- class apache_beam.transforms.util.Tee(*consumers: PTransform[PCollection[T], Any] | Callable[[PCollection[T]], Any])[source]
Bases:
PTransform
A PTransform that returns its input, but also applies its input elsewhere.
Similar to the shell {@code tee} command. This can be useful to write out or otherwise process an intermediate transform without breaking the linear flow of a chain of transforms, e.g.:
(input | SomePTransform() | ... | Tee(SomeSideTransform()) | ...)
- apache_beam.transforms.util.Values(label='Values')[source]
Produces a PCollection of second elements of 2-tuples in a PCollection.
- apache_beam.transforms.util.WithKeys(k, *args, **kwargs)[source]
PTransform that takes a PCollection, and either a constant key or a callable, and returns a PCollection of (K, V), where each of the values in the input PCollection has been paired with either the constant key or a key computed from the value. The callable may optionally accept positional or keyword arguments, which should be passed to WithKeys directly. These may be either SideInputs or static (non-PCollection) values, such as ints.
- class apache_beam.transforms.util.GroupIntoBatches(batch_size, max_buffering_duration_secs=None, clock=<built-in function time>)[source]
Bases:
PTransform
PTransform that batches the input into desired batch size. Elements are buffered until they are equal to batch size provided in the argument at which point they are output to the output Pcollection.
Windows are preserved (batches will contain elements from the same window)
Create a new GroupIntoBatches.
- Parameters:
batch_size – (required) How many elements should be in a batch
max_buffering_duration_secs – (optional) How long in seconds at most an incomplete batch of elements is allowed to be buffered in the states. The duration must be a positive second duration and should be given as an int or float. Setting this parameter to zero effectively means no buffering limit.
clock – (optional) an alternative to time.time (mostly for testing)
- to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] [source]
- class WithShardedKey(batch_size, max_buffering_duration_secs=None, clock=<built-in function time>)[source]
Bases:
PTransform
A GroupIntoBatches transform that outputs batched elements associated with sharded input keys.
By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time.
Create a new GroupIntoBatches with sharded output. See
GroupIntoBatches
transform for a description of input parameters.- to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] [source]