apache_beam.io package

Submodules

apache_beam.io.avroio module

Implements a source for reading Avro files.

class apache_beam.io.avroio.ReadFromAvro(file_pattern=None, min_bundle_size=0, validate=True)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for reading avro files.

display_data()[source]
expand(pvalue)[source]
class apache_beam.io.avroio.WriteToAvro(file_path_prefix, schema, codec='deflate', file_name_suffix='', num_shards=0, shard_name_template=None, mime_type='application/x-avro')[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for writing avro files.

display_data()[source]
expand(pcoll)[source]

apache_beam.io.concat_source module

For internal use only; no backwards-compatibility guarantees.

Concat Source, which reads the union of several other sources.

class apache_beam.io.concat_source.ConcatRangeTracker(start, end, source_bundles)[source]

Bases: apache_beam.io.iobase.RangeTracker

For internal use only; no backwards-compatibility guarantees.

Range tracker for ConcatSource

fraction_consumed()[source]
global_to_local(frac)[source]
local_to_global(source_ix, source_frac)[source]
position_at_fraction(fraction)[source]
set_current_position(pos)[source]
start_position()[source]
stop_position()[source]
sub_range_tracker(source_ix)[source]
try_claim(pos)[source]
try_split(pos)[source]
class apache_beam.io.concat_source.ConcatSource(sources)[source]

Bases: apache_beam.io.iobase.BoundedSource

For internal use only; no backwards-compatibility guarantees.

A BoundedSource that can group a set of BoundedSources.

Primarily for internal use, use the apache_beam.Flatten transform to create the union of several reads.

default_output_coder()[source]
estimate_size()[source]
get_range_tracker(start_position=None, stop_position=None)[source]
read(range_tracker)[source]
sources
split(desired_bundle_size=None, start_position=None, stop_position=None)[source]

apache_beam.io.filebasedsink module

File-based sink.

class apache_beam.io.filebasedsink.FileBasedSink(file_path_prefix, coder, file_name_suffix='', num_shards=0, shard_name_template=None, mime_type='application/octet-stream', compression_type='auto')[source]

Bases: apache_beam.io.iobase.Sink

A sink to a GCS or local files.

To implement a file-based sink, extend this class and override either write_record() or write_encoded_record().

If needed, also overwrite open() and/or close() to customize the file handling or write headers and footers.

The output of this write is a PCollection of all written shards.

close(file_handle)[source]

Finalize and close the file handle returned from open().

Called after all records are written.

By default, calls file_handle.close() iff it is not None.

display_data()[source]
finalize_write(*args, **kwargs)[source]
initialize_write(*args, **kwargs)[source]
open(*args, **kwargs)[source]

Opens temp_path, returning an opaque file handle object.

The returned file handle is passed to write_[encoded_]record and close.

open_writer(*args, **kwargs)[source]
write_encoded_record(file_handle, encoded_value)[source]

Writes a single encoded record to the file handle returned by open().

write_record(file_handle, value)[source]

Writes a single record go the file handle returned by open().

By default, calls write_encoded_record after encoding the record with this sink’s Coder.

apache_beam.io.filebasedsource module

A framework for developing sources for new file types.

To create a source for a new file type a sub-class of FileBasedSource should be created. Sub-classes of FileBasedSource must implement the method FileBasedSource.read_records(). Please read the documentation of that method for more details.

For an example implementation of FileBasedSource see avroio.AvroSource.

class apache_beam.io.filebasedsource.FileBasedSource(file_pattern, min_bundle_size=0, compression_type='auto', splittable=True, validate=True)[source]

Bases: apache_beam.io.iobase.BoundedSource

A BoundedSource for reading a file glob of a given type.

MIN_FRACTION_OF_FILES_TO_STAT = 0.01
MIN_NUMBER_OF_FILES_TO_STAT = 100
display_data()[source]
estimate_size(*args, **kwargs)[source]
get_range_tracker(start_position, stop_position)[source]
open_file(file_name)[source]
read(range_tracker)[source]
read_records(file_name, offset_range_tracker)[source]

Returns a generator of records created by reading file ‘file_name’.

Parameters:
  • file_name – a string that gives the name of the file to be read. Method FileBasedSource.open_file() must be used to open the file and create a seekable file object.
  • offset_range_tracker – a object of type OffsetRangeTracker. This defines the byte range of the file that should be read. See documentation in iobase.BoundedSource.read() for more information on reading records while complying to the range defined by a given RangeTracker.
Returns:

an iterator that gives the records read from the given file.

split(desired_bundle_size=None, start_position=None, stop_position=None)[source]
splittable

apache_beam.io.filesystem module

File system abstraction for file-based sources and sinks.

class apache_beam.io.filesystem.CompressionTypes[source]

Bases: object

Enum-like class representing known compression types.

AUTO = 'auto'
BZIP2 = 'bzip2'
GZIP = 'gzip'
UNCOMPRESSED = 'uncompressed'
classmethod detect_compression_type(file_path)[source]

Returns the compression type of a file (based on its suffix).

classmethod is_valid_compression_type(compression_type)[source]

Returns True for valid compression types, False otherwise.

classmethod mime_type(compression_type, default='application/octet-stream')[source]
class apache_beam.io.filesystem.CompressedFile(fileobj, compression_type='gzip', read_size=16777216)[source]

Bases: object

File wrapper for easier handling of compressed files.

close()[source]
closed()[source]
flush()[source]
read(num_bytes)[source]
readable()[source]
readline()[source]

Equivalent to standard file.readline(). Same return conventions apply.

seek(offset, whence=0)[source]

Set the file’s current offset.

Seeking behavior:
  • seeking from the end (SEEK_END) the whole file is decompressed once to determine it’s size. Therefore it is preferred to use SEEK_SET or SEEK_CUR to avoid the processing overhead
  • seeking backwards from the current position rewinds the file to 0 and decompresses the chunks to the requested offset
  • seeking is only supported in files opened for reading
  • if the new offset is out of bound, it is adjusted to either 0 or EOF.
Parameters:
  • offset – seek offset in the uncompressed content represented as number
  • whence – seek mode. Supported modes are os.SEEK_SET (absolute seek), os.SEEK_CUR (seek relative to the current position), and os.SEEK_END (seek relative to the end, offset should be negative).
Raises:
  • IOError – When this buffer is closed.
  • ValueError – When whence is invalid or the file is not seekable
seekable
tell()[source]

Returns current position in uncompressed file.

write(data)[source]

Write data to file.

writeable()[source]
class apache_beam.io.filesystem.FileMetadata(path, size_in_bytes)[source]

Bases: object

Metadata about a file path that is the output of FileSystem.match

class apache_beam.io.filesystem.FileSystem[source]

Bases: apache_beam.utils.plugin.BeamPlugin

A class that defines the functions that can be performed on a filesystem.

All methods are abstract and they are for file system providers to implement. Clients should use the FileSystemUtil class to interact with the correct file system based on the provided file pattern scheme.

CHUNK_SIZE = 1
copy(source_file_names, destination_file_names)[source]

Recursively copy the file tree from the source to the destination

Parameters:
  • source_file_names – list of source file objects that needs to be copied
  • destination_file_names – list of destination of the new object
Raises:

BeamIOError if any of the copy operations fail

create(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a write channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

delete(paths)[source]

Deletes files or directories at the provided paths. Directories will be deleted recursively.

Parameters:paths – list of paths that give the file objects to be deleted
Raises:BeamIOError if any of the delete operations fail
exists(path)[source]

Check if the provided path exists on the FileSystem.

Parameters:path – string path that needs to be checked.

Returns: boolean flag indicating if path exists

join(basepath, *paths)[source]

Join two or more pathname components for the filesystem

Parameters:
  • basepath – string path of the first component of the path
  • paths – path components to be added

Returns: full path after combining all the passed components

match(patterns, limits=None)[source]

Find all matching paths to the patterns provided.

Parameters:
  • patterns – list of string for the file path pattern to match against
  • limits – list of maximum number of responses that need to be fetched

Returns: list of MatchResult objects.

Raises:BeamIOError if any of the pattern match operations fail
mkdirs(path)[source]

Recursively create directories for the provided path.

Parameters:path – string path of the directory structure that should be created
Raises:IOError if leaf directory already exists.
open(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a read channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

rename(source_file_names, destination_file_names)[source]

Rename the files at the source list to the destination list. Source and destination lists should be of the same size.

Parameters:
  • source_file_names – List of file paths that need to be moved
  • destination_file_names – List of destination_file_names for the files
Raises:

BeamIOError if any of the rename operations fail

classmethod scheme()[source]

URI scheme for the FileSystem

split(path)[source]

Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.

For file-systems other than the local file-system, head should include the prefix.

Parameters:path – path as a string
Returns:a pair of path components as strings.
class apache_beam.io.filesystem.MatchResult(pattern, metadata_list)[source]

Bases: object

Result from the FileSystem match operation which contains the list of matched FileMetadata.

apache_beam.io.filesystems module

FileSystems interface class for accessing the correct filesystem

class apache_beam.io.filesystems.FileSystems[source]

Bases: object

A class that defines the functions that can be performed on a filesystem. All methods are static and access the underlying registered filesystems.

URI_SCHEMA_PATTERN = <_sre.SRE_Pattern object>
static copy(source_file_names, destination_file_names)[source]

Recursively copy the file list from the source to the destination

Parameters:
  • source_file_names – list of source file objects that needs to be copied
  • destination_file_names – list of destination of the new object
Raises:

BeamIOError if any of the copy operations fail

static create(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a write channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object. See CompressionTypes for possible values.

Returns: file handle with a close function for the user to use.

static delete(paths)[source]

Deletes files or directories at the provided paths. Directories will be deleted recursively.

Parameters:paths – list of paths that give the file objects to be deleted
Raises:BeamIOError if any of the delete operations fail
static exists(path)[source]

Check if the provided path exists on the FileSystem.

Parameters:path – string path that needs to be checked.

Returns: boolean flag indicating if path exists

static get_chunk_size(path)[source]

Get the correct chunk size for the FileSystem.

Parameters:path – string path that needs to be checked.

Returns: integer size for parallelization in the FS operations.

static get_filesystem(path)[source]

Get the correct filesystem for the specified path

static get_scheme(path)[source]
static join(basepath, *paths)[source]

Join two or more pathname components for the filesystem

Parameters:
  • basepath – string path of the first component of the path
  • paths – path components to be added

Returns: full path after combining all the passed components

static match(patterns, limits=None)[source]

Find all matching paths to the patterns provided.

Parameters:
  • patterns – list of string for the file path pattern to match against
  • limits – list of maximum number of responses that need to be fetched

Returns: list of MatchResult objects.

Raises:BeamIOError if any of the pattern match operations fail
static mkdirs(path)[source]

Recursively create directories for the provided path.

Parameters:path – string path of the directory structure that should be created
Raises:IOError if leaf directory already exists.
static open(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a read channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object. See CompressionTypes for possible values.

Returns: file handle with a close function for the user to use.

static rename(source_file_names, destination_file_names)[source]

Rename the files at the source list to the destination list. Source and destination lists should be of the same size.

Parameters:
  • source_file_names – List of file paths that need to be moved
  • destination_file_names – List of destination_file_names for the files
Raises:

BeamIOError if any of the rename operations fail

static split(path)[source]

Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.

For file-systems other than the local file-system, head should include the prefix.

Parameters:path – path as a string
Returns:a pair of path components as strings.

apache_beam.io.iobase module

Sources and sinks.

A Source manages record-oriented data input from a particular kind of source (e.g. a set of files, a database table, etc.). The reader() method of a source returns a reader object supporting the iterator protocol; iteration yields raw records of unprocessed, serialized data.

A Sink manages record-oriented data output to a particular kind of sink (e.g. a set of files, a database table, etc.). The writer() method of a sink returns a writer object supporting writing records of serialized data to the sink.

class apache_beam.io.iobase.BoundedSource[source]

Bases: apache_beam.transforms.display.HasDisplayData

A source that reads a finite amount of input records.

This class defines following operations which can be used to read the source efficiently.

  • Size estimation - method estimate_size() may return an accurate estimation in bytes for the size of the source.
  • Splitting into bundles of a given size - method split() can be used to split the source into a set of sub-sources (bundles) based on a desired bundle size.
  • Getting a RangeTracker - method get_range_tracker() should return a RangeTracker object for a given position range for the position type of the records returned by the source.
  • Reading the data - method read() can be used to read data from the source while respecting the boundaries defined by a given RangeTracker.

A runner will perform reading the source in two steps.

  1. Method get_range_tracker() will be invoked with start and end positions to obtain a RangeTracker for the range of positions the runner intends to read. Source must define a default initial start and end position range. These positions must be used if the start and/or end positions passed to the method get_range_tracker() are None
  2. Method read() will be invoked with the RangeTracker obtained in the previous step.

Mutability

A BoundedSource object should not be mutated while its methods (for example, read()) are being invoked by a runner. Runner implementations may invoke methods of BoundedSource objects through multi-threaded and/or reentrant execution modes.

default_output_coder()[source]

Coder that should be used for the records returned by the source.

Should be overridden by sources that produce objects that can be encoded more efficiently than pickling.

estimate_size()[source]

Estimates the size of source in bytes.

An estimate of the total size (in bytes) of the data that would be read from this source. This estimate is in terms of external storage size, before performing decompression or other processing.

Returns:estimated size of the source if the size can be determined, None otherwise.
get_range_tracker(start_position, stop_position)[source]

Returns a RangeTracker for a given position range.

Framework may invoke read() method with the RangeTracker object returned here to read data from the source.

Parameters:
  • start_position – starting position of the range. If ‘None’ default start position of the source must be used.
  • stop_position – ending position of the range. If ‘None’ default stop position of the source must be used.
Returns:

a RangeTracker for the given position range.

read(range_tracker)[source]

Returns an iterator that reads data from the source.

The returned set of data must respect the boundaries defined by the given RangeTracker object. For example:

  • Returned set of data must be for the range [range_tracker.start_position, range_tracker.stop_position). Note that a source may decide to return records that start after range_tracker.stop_position. See documentation in class RangeTracker for more details. Also, note that framework might invoke range_tracker.try_split() to perform dynamic split operations. range_tracker.stop_position may be updated dynamically due to successful dynamic split operations.
  • Method range_tracker.try_split() must be invoked for every record that starts at a split point.
  • Method range_tracker.record_current_position() may be invoked for records that do not start at split points.
Parameters:range_tracker – a RangeTracker whose boundaries must be respected when reading data from the source. A runner that reads this source muss pass a RangeTracker object that is not None.
Returns:an iterator of data read by the source.
split(desired_bundle_size, start_position=None, stop_position=None)[source]

Splits the source into a set of bundles.

Bundles should be approximately of size desired_bundle_size bytes.

Parameters:
  • desired_bundle_size – the desired size (in bytes) of the bundles returned.
  • start_position – if specified the given position must be used as the starting position of the first bundle.
  • stop_position – if specified the given position must be used as the ending position of the last bundle.
Returns:

an iterator of objects of type ‘SourceBundle’ that gives information about the generated bundles.

class apache_beam.io.iobase.RangeTracker[source]

Bases: object

A thread safe object used by Dataflow source framework.

A Dataflow source is defined using a ‘’BoundedSource’’ and a ‘’RangeTracker’’ pair. A ‘’RangeTracker’’ is used by Dataflow source framework to perform dynamic work rebalancing of position-based sources.

Position-based sources

A position-based source is one where the source can be described by a range of positions of an ordered type and the records returned by the reader can be described by positions of the same type.

In case a record occupies a range of positions in the source, the most important thing about the record is the position where it starts.

Defining the semantics of positions for a source is entirely up to the source class, however the chosen definitions have to obey certain properties in order to make it possible to correctly split the source into parts, including dynamic splitting. Two main aspects need to be defined:

  1. How to assign starting positions to records.
  2. Which records should be read by a source with a range ‘[A, B)’.

Moreover, reading a range must be efficient, i.e., the performance of reading a range should not significantly depend on the location of the range. For example, reading the range [A, B) should not require reading all data before ‘A’.

The sections below explain exactly what properties these definitions must satisfy, and how to use a RangeTracker with a properly defined source.

Properties of position-based sources

The main requirement for position-based sources is associativity: reading records from ‘[A, B)’ and records from ‘[B, C)’ should give the same records as reading from ‘[A, C)’, where ‘A <= B <= C’. This property ensures that no matter how a range of positions is split into arbitrarily many sub-ranges, the total set of records described by them stays the same.

The other important property is how the source’s range relates to positions of records in the source. In many sources each record can be identified by a unique starting position. In this case:

  • All records returned by a source ‘[A, B)’ must have starting positions in this range.
  • All but the last record should end within this range. The last record may or may not extend past the end of the range.
  • Records should not overlap.

Such sources should define “read ‘[A, B)’” as “read from the first record starting at or after ‘A’, up to but not including the first record starting at or after ‘B’”.

Some examples of such sources include reading lines or CSV from a text file, reading keys and values from a BigTable, etc.

The concept of split points allows to extend the definitions for dealing with sources where some records cannot be identified by a unique starting position.

In all cases, all records returned by a source ‘[A, B)’ must start at or after ‘A’.

Split points

Some sources may have records that are not directly addressable. For example, imagine a file format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but records within the block cannot be directly addressed without decompressing the block. Let us refer to this hypothetical format as <i>CBF (Compressed Blocks Format)</i>.

Many such formats can still satisfy the associativity property. For example, in CBF, reading ‘[A, B)’ can mean “read all the records in all blocks whose starting offset is in ‘[A, B)’”.

To support such complex formats, we introduce the notion of split points. We say that a record is a split point if there exists a position ‘A’ such that the record is the first one to be returned when reading the range ‘[A, infinity)’. In CBF, the only split points would be the first records in each block.

Split points allow us to define the meaning of a record’s position and a source’s range in all cases:

  • For a record that is at a split point, its position is defined to be the largest ‘A’ such that reading a source with the range ‘[A, infinity)’ returns this record.
  • Positions of other records are only required to be non-decreasing.
  • Reading the source ‘[A, B)’ must return records starting from the first split point at or after ‘A’, up to but not including the first split point at or after ‘B’. In particular, this means that the first record returned by a source MUST always be a split point.
  • Positions of split points must be unique.

As a result, for any decomposition of the full range of the source into position ranges, the total set of records will be the full set of records in the source, and each record will be read exactly once.

Consumed positions

As the source is being read, and records read from it are being passed to the downstream transforms in the pipeline, we say that positions in the source are being consumed. When a reader has read a record (or promised to a caller that a record will be returned), positions up to and including the record’s start position are considered consumed.

Dynamic splitting can happen only at unconsumed positions. If the reader just returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or beyond, as otherwise that record could be read twice (by the current reader and by a reader of the task starting at 43).

SPLIT_POINTS_UNKNOWN = <object object>
fraction_consumed()[source]

Returns the approximate fraction of consumed positions in the source.

** Thread safety **

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

Returns:the approximate fraction of positions that have been consumed by successful ‘try_split()’ and ‘report_current_position()’ calls, or 0.0 if no such calls have happened.
position_at_fraction(fraction)[source]

Returns the position at the given fraction.

Given a fraction within the range [0.0, 1.0) this method will return the position at the given fraction compared to the position range [self.start_position, self.stop_position).

** Thread safety **

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

Parameters:fraction – a float value within the range [0.0, 1.0).
Returns:a position within the range [self.start_position, self.stop_position).
set_current_position(position)[source]

Updates the last-consumed position to the given position.

A source may invoke this method for records that do not start at split points. This may modify the internal state of the RangeTracker. If the record starts at a split point, method try_claim() must be invoked instead of this method.

Parameters:position – starting position of a record being read by a source.
set_split_points_unclaimed_callback(callback)[source]

Sets a callback for determining the unclaimed number of split points.

By invoking this function, a BoundedSource can set a callback function that may get invoked by the RangeTracker to determine the number of unclaimed split points. A split point is unclaimed if RangeTracker.try_claim() method has not been successfully invoked for that particular split point. The callback function accepts a single parameter, a stop position for the BoundedSource (stop_position). If the record currently being consumed by the BoundedSource is at position current_position, callback should return the number of split points within the range (current_position, stop_position). Note that, this should not include the split point that is currently being consumed by the source.

This function must be implemented by subclasses before being used.

Parameters:callback – a function that takes a single parameter, a stop position, and returns unclaimed number of split points for the source read operation that is calling this function. Value returned from callback should be either an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN.
split_points()[source]

Gives the number of split points consumed and remaining.

For a RangeTracker used by a BoundedSource (within a BoundedSource.read() invocation) this method produces a 2-tuple that gives the number of split points consumed by the BoundedSource and the number of split points remaining within the range of the RangeTracker that has not been consumed by the BoundedSource.

More specifically, given that the position of the current record being read by BoundedSource is current_position this method produces a tuple that consists of (1) number of split points in the range [self.start_position(), current_position) without including the split point that is currently being consumed. This represents the total amount of parallelism in the consumed part of the source. (2) number of split points within the range [current_position, self.stop_position()) including the split point that is currently being consumed. This represents the total amount of parallelism in the unconsumed part of the source.

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

** General information about consumed and remaining number of split
points returned by this method. **
  • Before a source read (BoundedSource.read() invocation) claims the first split point, number of consumed split points is 0. This condition holds independent of whether the input is “splittable”. A splittable source is a source that has more than one split point.
  • Any source read that has only claimed one split point has 0 consumed split points since the first split point is the current split point and is still being processed. This condition holds independent of whether the input is splittable.
  • For an empty source read which never invokes RangeTracker.try_claim(), the consumed number of split points is 0. This condition holds independent of whether the input is splittable.
  • For a source read which has invoked RangeTracker.try_claim() n times, the consumed number of split points is n -1.
  • If a BoundedSource sets a callback through function set_split_points_unclaimed_callback(), RangeTracker can use that callback when determining remaining number of split points.
  • Remaining split points should include the split point that is currently being consumed by the source read. Hence if the above callback returns an integer value n, remaining number of split points should be (n + 1).
  • After last split point is claimed remaining split points becomes 1, because this unfinished read itself represents an unfinished split point.
  • After all records of the source has been consumed, remaining number of split points becomes 0 and consumed number of split points becomes equal to the total number of split points within the range being read by the source. This method does not address this condition and will continue to report number of consumed split points as (“total number of split points” - 1) and number of remaining split points as 1. A runner that performs the reading of the source can detect when all records have been consumed and adjust remaining and consumed number of split points accordingly.

** Examples **

  1. A “perfectly splittable” input which can be read in parallel down to the individual records.

    Consider a perfectly splittable input that consists of 50 split points.

  • Before a source read (BoundedSource.read() invocation) claims the first split point, number of consumed split points is 0 number of remaining split points is 50.
  • After claiming first split point, consumed number of split points is 0 and remaining number of split is 50.
  • After claiming split point #30, consumed number of split points is 29 and remaining number of split points is 21.
  • After claiming all 50 split points, consumed number of split points is 49 and remaining number of split points is 1.
  1. a “block-compressed” file format such as avroio, in which a block of records has to be read as a whole, but different blocks can be read in parallel.

    Consider a block compressed input that consists of 5 blocks.

  • Before a source read (BoundedSource.read() invocation) claims the first split point (first block), number of consumed split points is 0 number of remaining split points is 5.
  • After claiming first split point, consumed number of split points is 0 and remaining number of split is 5.
  • After claiming split point #3, consumed number of split points is 2 and remaining number of split points is 3.
  • After claiming all 5 split points, consumed number of split points is 4 and remaining number of split points is 1.
  1. an “unsplittable” input such as a cursor in a database or a gzip compressed file.

    Such an input is considered to have only a single split point. Number of consumed split points is always 0 and number of remaining split points is always 1.

By default RangeTracker` returns ``RangeTracker.SPLIT_POINTS_UNKNOWN for both consumed and remaining number of split points, which indicates that the number of split points consumed and remaining is unknown.

Returns:A pair that gives consumed and remaining number of split points. Consumed number of split points should be an integer larger than or equal to zero or RangeTracker.SPLIT_POINTS_UNKNOWN. Remaining number of split points should be an integer larger than zero or RangeTracker.SPLIT_POINTS_UNKNOWN.
start_position()[source]

Returns the starting position of the current range, inclusive.

stop_position()[source]

Returns the ending position of the current range, exclusive.

try_claim(position)[source]

Atomically determines if a record at a split point is within the range.

This method should be called if and only if the record is at a split point. This method may modify the internal state of the RangeTracker by updating the last-consumed position to position.

** Thread safety **

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

Parameters:position – starting position of a record being read by a source.
Returns:True, if the given position falls within the current range, returns False otherwise.
try_split(position)[source]

Atomically splits the current range.

Determines a position to split the current range, split_position, based on the given position. In most cases split_position and position will be the same.

Splits the current range ‘[self.start_position, self.stop_position)’ into a “primary” part ‘[self.start_position, split_position)’ and a “residual” part ‘[split_position, self.stop_position)’, assuming the current last-consumed position is within ‘[self.start_position, split_position)’ (i.e., split_position has not been consumed yet).

If successful, updates the current range to be the primary and returns a tuple (split_position, split_fraction). split_fraction should be the fraction of size of range ‘[self.start_position, split_position)’ compared to the original (before split) range ‘[self.start_position, self.stop_position)’.

If the split_position has already been consumed, returns None.

** Thread safety **

Methods of the class RangeTracker including this method may get invoked by different threads, hence must be made thread-safe, e.g. by using a single lock object.

Parameters:position – suggested position where the current range should try to be split at.
Returns:a tuple containing the split position and split fraction if split is successful. Returns None otherwise.
class apache_beam.io.iobase.Read(source)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A transform that reads a PCollection.

display_data()[source]
expand(pbegin)[source]
get_windowing(unused_inputs)[source]
class apache_beam.io.iobase.Sink[source]

Bases: apache_beam.transforms.display.HasDisplayData

This class is deprecated, no backwards-compatibility guarantees.

A resource that can be written to using the beam.io.Write transform.

Here beam stands for Apache Beam Python code imported in following manner. import apache_beam as beam.

A parallel write to an iobase.Sink consists of three phases:

  1. A sequential initialization phase (e.g., creating a temporary output directory, etc.)
  2. A parallel write phase where workers write bundles of records
  3. A sequential finalization phase (e.g., committing the writes, merging output files, etc.)

Implementing a new sink requires extending two classes.

  1. iobase.Sink

iobase.Sink is an immutable logical description of the location/resource to write to. Depending on the type of sink, it may contain fields such as the path to an output directory on a filesystem, a database table name, etc. iobase.Sink provides methods for performing a write operation to the sink described by it. To this end, implementors of an extension of iobase.Sink must implement three methods: initialize_write(), open_writer(), and finalize_write().

  1. iobase.Writer

iobase.Writer is used to write a single bundle of records. An iobase.Writer defines two methods: write() which writes a single record from the bundle and close() which is called once at the end of writing a bundle.

See also apache_beam.io.filebasedsink.FileBasedSink which provides a simpler API for writing sinks that produce files.

Execution of the Write transform

initialize_write() and finalize_write() are conceptually called once: at the beginning and end of a Write transform. However, implementors must ensure that these methods are idempotent, as they may be called multiple times on different machines in the case of failure/retry or for redundancy.

initialize_write() should perform any initialization that needs to be done prior to writing to the sink. initialize_write() may return a result (let’s call this init_result) that contains any parameters it wants to pass on to its writers about the sink. For example, a sink that writes to a file system may return an init_result that contains a dynamically generated unique directory to which data should be written.

To perform writing of a bundle of elements, Dataflow execution engine will create an iobase.Writer using the implementation of iobase.Sink.open_writer(). When invoking open_writer() execution engine will provide the init_result returned by initialize_write() invocation as well as a bundle id (let’s call this bundle_id) that is unique for each invocation of open_writer().

Execution engine will then invoke iobase.Writer.write() implementation for each element that has to be written. Once all elements of a bundle are written, execution engine will invoke iobase.Writer.close() implementation which should return a result (let’s call this write_result) that contains information that encodes the result of the write and, in most cases, some encoding of the unique bundle id. For example, if each bundle is written to a unique temporary file, close() method may return an object that contains the temporary file name. After writing of all bundles is complete, execution engine will invoke finalize_write() implementation. As parameters to this invocation execution engine will provide init_result as well as an iterable of write_result.

The execution of a write transform can be illustrated using following pseudo code (assume that the outer for loop happens in parallel across many machines):

init_result = sink.initialize_write()
write_results = []
for bundle in partition(pcoll):
  writer = sink.open_writer(init_result, generate_bundle_id())
  for elem in bundle:
    writer.write(elem)
  write_results.append(writer.close())
sink.finalize_write(init_result, write_results)

init_result

Methods of ‘iobase.Sink’ should agree on the ‘init_result’ type that will be returned when initializing the sink. This type can be a client-defined object or an existing type. The returned type must be picklable using Dataflow coder coders.PickleCoder. Returning an init_result is optional.

bundle_id

In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the event of failure/retry or for redundancy). However, exactly one of these executions will have its result passed to the iobase.Sink.finalize_write() method. Each call to iobase.Sink.open_writer() is passed a unique bundle id when it is called by the WriteImpl transform, so even redundant or retried bundles will have a unique way of identifying their output.

The bundle id should be used to guarantee that a bundle’s output is unique. This uniqueness guarantee is important; if a bundle is to be output to a file, for example, the name of the file must be unique to avoid conflicts with other writers. The bundle id should be encoded in the writer result returned by the writer and subsequently used by the finalize_write() method to identify the results of successful writes.

For example, consider the scenario where a Writer writes files containing serialized records and the finalize_write() is to merge or rename these output files. In this case, a writer may use its unique id to name its output file (to avoid conflicts) and return the name of the file it wrote as its writer result. The finalize_write() will then receive an Iterable of output file names that it can then merge or rename using some bundle naming scheme.

write_result

iobase.Writer.close() and finalize_write() implementations must agree on type of the write_result object returned when invoking iobase.Writer.close(). This type can be a client-defined object or an existing type. The returned type must be picklable using Dataflow coder coders.PickleCoder. Returning a write_result when iobase.Writer.close() is invoked is optional but if unique write_result objects are not returned, sink should, guarantee idempotency when same bundle is written multiple times due to failure/retry or redundancy.

More information

For more information on creating new sinks please refer to the official documentation at https://beam.apache.org/documentation/sdks/python-custom-io#creating-sinks

finalize_write(init_result, writer_results)[source]

Finalizes the sink after all data is written to it.

Given the result of initialization and an iterable of results from bundle writes, performs finalization after writing and closes the sink. Called after all bundle writes are complete.

The bundle write results that are passed to finalize are those returned by bundles that completed successfully. Although bundles may have been run multiple times (for fault-tolerance), only one writer result will be passed to finalize for each bundle. An implementation of finalize should perform clean up of any failed and successfully retried bundles. Note that these failed bundles will not have their writer result passed to finalize, so finalize should be capable of locating any temporary/partial output written by failed bundles.

If all retries of a bundle fails, the whole pipeline will fail without finalize_write() being invoked.

A best practice is to make finalize atomic. If this is impossible given the semantics of the sink, finalize should be idempotent, as it may be called multiple times in the case of failure/retry or for redundancy.

Note that the iteration order of the writer results is not guaranteed to be consistent if finalize is called multiple times.

Parameters:
  • init_result – the result of initialize_write() invocation.
  • writer_results – an iterable containing results of Writer.close() invocations. This will only contain results of successful writes, and will only contain the result of a single successful write for a given bundle.
initialize_write()[source]

Initializes the sink before writing begins.

Invoked before any data is written to the sink.

Please see documentation in iobase.Sink for an example.

Returns:An object that contains any sink specific state generated by initialization. This object will be passed to open_writer() and finalize_write() methods.
open_writer(init_result, uid)[source]

Opens a writer for writing a bundle of elements to the sink.

Parameters:
  • init_result – the result of initialize_write() invocation.
  • uid – a unique identifier generated by the system.
Returns:

an iobase.Writer that can be used to write a bundle of records to the current sink.

class apache_beam.io.iobase.Write(sink)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform that writes to a sink.

A sink should inherit iobase.Sink. Such implementations are handled using a composite transform that consists of three ParDo``s - (1) a ``ParDo performing a global initialization (2) a ParDo performing a parallel write and (3) a ParDo performing a global finalization. In the case of an empty PCollection, only the global initialization and finalization will be performed. Currently only batch workflows support custom sinks.

Example usage:

pcollection | beam.io.Write(MySink())

This returns a pvalue.PValue object that represents the end of the Pipeline.

The sink argument may also be a full PTransform, in which case it will be applied directly. This allows composite sink-like transforms (e.g. a sink with some pre-processing DoFns) to be used the same as all other sinks.

This transform also supports sinks that inherit iobase.NativeSink. These are sinks that are implemented natively by the Dataflow service and hence should not be updated by users. These sinks are processed using a Dataflow native write transform.

display_data()[source]
expand(pcoll)[source]
class apache_beam.io.iobase.Writer[source]

Bases: object

This class is deprecated, no backwards-compatibility guarantees.

Writes a bundle of elements from a PCollection to a sink.

A Writer iobase.Writer.write() writes and elements to the sink while iobase.Writer.close() is called after all elements in the bundle have been written.

See iobase.Sink for more detailed documentation about the process of writing to a sink.

close()[source]

Closes the current writer.

Please see documentation in iobase.Sink for an example.

Returns:An object representing the writes that were performed by the current writer.
write(value)[source]

Writes a value to the sink using the current writer.

apache_beam.io.localfilesystem module

Local File system implementation for accessing files on disk.

class apache_beam.io.localfilesystem.LocalFileSystem[source]

Bases: apache_beam.io.filesystem.FileSystem

A Local FileSystem implementation for accessing files on disk.

copy(source_file_names, destination_file_names)[source]

Recursively copy the file tree from the source to the destination

Parameters:
  • source_file_names – list of source file objects that needs to be copied
  • destination_file_names – list of destination of the new object
Raises:

BeamIOError if any of the copy operations fail

create(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a write channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

delete(paths)[source]

Deletes files or directories at the provided paths. Directories will be deleted recursively.

Parameters:paths – list of paths that give the file objects to be deleted
Raises:BeamIOError if any of the delete operations fail
exists(path)[source]

Check if the provided path exists on the FileSystem.

Parameters:path – string path that needs to be checked.

Returns: boolean flag indicating if path exists

join(basepath, *paths)[source]

Join two or more pathname components for the filesystem

Parameters:
  • basepath – string path of the first component of the path
  • paths – path components to be added

Returns: full path after combining all the passed components

match(patterns, limits=None)[source]

Find all matching paths to the pattern provided.

Parameters:
  • patterns – list of string for the file path pattern to match against
  • limits – list of maximum number of responses that need to be fetched

Returns: list of MatchResult objects.

Raises:BeamIOError if any of the pattern match operations fail
mkdirs(path)[source]

Recursively create directories for the provided path.

Parameters:path – string path of the directory structure that should be created
Raises:IOError if leaf directory already exists.
open(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a read channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

rename(source_file_names, destination_file_names)[source]

Rename the files at the source list to the destination list. Source and destination lists should be of the same size.

Parameters:
  • source_file_names – List of file paths that need to be moved
  • destination_file_names – List of destination_file_names for the files
Raises:

BeamIOError if any of the rename operations fail

classmethod scheme()[source]

URI scheme for the FileSystem

split(path)[source]

Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.

Parameters:path – path as a string
Returns:a pair of path components as strings.

apache_beam.io.range_trackers module

iobase.RangeTracker implementations provided with Dataflow SDK.

class apache_beam.io.range_trackers.OffsetRangeTracker(start, end)[source]

Bases: apache_beam.io.iobase.RangeTracker

A ‘RangeTracker’ for non-negative positions of type ‘long’.

OFFSET_INFINITY = inf
fraction_consumed()[source]
last_record_start
position_at_fraction(fraction)[source]
set_current_position(record_start)[source]
set_split_points_unclaimed_callback(callback)[source]
split_points()[source]
start_position()[source]
stop_position()[source]
try_claim(record_start)[source]
try_split(split_offset)[source]
class apache_beam.io.range_trackers.LexicographicKeyRangeTracker(start_position=None, stop_position=None)[source]

Bases: apache_beam.io.range_trackers.OrderedPositionRangeTracker

A range tracker that tracks progress through a lexicographically ordered keyspace of strings.

classmethod fraction_to_position(fraction, start=None, end=None)[source]

Linearly interpolates a key that is lexicographically fraction of the way between start and end.

classmethod position_to_fraction(key, start=None, end=None)[source]

Returns the fraction of keys in the range [start, end) that are less than the given key.

class apache_beam.io.range_trackers.OrderedPositionRangeTracker(start_position=None, stop_position=None)[source]

Bases: apache_beam.io.iobase.RangeTracker

An abstract base class for range trackers whose positions are comparable.

Subclasses only need to implement the mapping from position ranges to and from the closed interval [0, 1].

UNSTARTED = <object object>
fraction_consumed()[source]
fraction_to_position(fraction, start, end)[source]

Converts a fraction between 0 and 1 to a position between start and end.

position_at_fraction(fraction)[source]
position_to_fraction(pos, start, end)[source]

Converts a position pos betweeen start and end (inclusive) to a fraction between 0 and 1.

start_position()[source]
stop_position()[source]
try_claim(position)[source]
try_split(position)[source]
class apache_beam.io.range_trackers.UnsplittableRangeTracker(range_tracker)[source]

Bases: apache_beam.io.iobase.RangeTracker

A RangeTracker that always ignores split requests.

This can be used to make a given RangeTracker object unsplittable by ignoring all calls to try_split(). All other calls will be delegated to the given RangeTracker.

fraction_consumed()[source]
position_at_fraction(fraction)[source]
set_current_position(position)[source]
set_split_points_unclaimed_callback(callback)[source]
split_points()[source]
start_position()[source]
stop_position()[source]
try_claim(position)[source]
try_split(position)[source]

apache_beam.io.source_test_utils module

Helper functions and test harnesses for source implementations.

This module contains helper functions and test harnesses for checking correctness of source (a subclass of iobase.BoundedSource) and range tracker (a subclass of``iobase.RangeTracker``) implementations.

Contains a few lightweight utilities (e.g. reading items from a source such as readFromSource(), as well as heavyweight property testing and stress testing harnesses that help getting a large amount of test coverage with few code.

Most notable ones are: * assertSourcesEqualReferenceSource() helps testing that the data read by the union of sources produced by BoundedSource.split() is the same as data read by the original source. * If your source implements dynamic work rebalancing, use the assertSplitAtFraction() family of functions - they test behavior of RangeTracker.try_split(), in particular, that various consistency properties are respected and the total set of data read by the source is preserved when splits happen. Use assertSplitAtFractionBehavior() to test individual cases of RangeTracker.try_split() and use assertSplitAtFractionExhaustive() as a heavy-weight stress test including concurrency. We strongly recommend to use both.

For example usages, see the unit tests of modules such as
  • apache_beam.io.source_test_utils_test.py
  • apache_beam.io.avroio_test.py
apache_beam.io.source_test_utils.read_from_source(source, start_position=None, stop_position=None)[source]

Reads elements from the given `BoundedSource`.

Only reads elements within the given position range. :param source: iobase.BoundedSource implementation. :param start_position: start position for reading. :param stop_position: stop position for reading.

Returns:the set of values read from the sources.
apache_beam.io.source_test_utils.assert_sources_equal_reference_source(reference_source_info, sources_info)[source]

Tests if a reference source is equal to a given set of sources.

Given a reference source (a BoundedSource and a position range) and a list of sources, assert that the union of the records read from the list of sources is equal to the records read from the reference source.

Parameters:
  • reference_source_info – a three-tuple that gives the reference iobase.BoundedSource, position to start reading at, and position to stop reading at.
  • sources_info – a set of sources. Each source is a three-tuple that is of the same format described above.
Raises:

ValueError – if the set of data produced by the reference source and the given set of sources are not equivalent.

apache_beam.io.source_test_utils.assert_reentrant_reads_succeed(source_info)[source]

Tests if a given source can be read in a reentrant manner.

Assume that given source produces the set of values {v1, v2, v3, ... vn}. For i in range [1, n-1] this method performs a reentrant read after reading i elements and verifies that both the original and reentrant read produce the expected set of values.

Parameters:source_info – a three-tuple that gives the reference iobase.BoundedSource, position to start reading at, and a position to stop reading at.
Raises:ValueError – if source is too trivial or reentrant read result in an incorrect read.
apache_beam.io.source_test_utils.assert_split_at_fraction_behavior(source, num_items_to_read_before_split, split_fraction, expected_outcome)[source]

Verifies the behaviour of splitting a source at a given fraction.

Asserts that splitting a BoundedSource either fails after reading num_items_to_read_before_split items, or succeeds in a way that is consistent according to assertSplitAtFractionSucceedsAndConsistent().

Parameters:
  • source – the source to perform dynamic splitting on.
  • num_items_to_read_before_split – number of items to read before splitting.
  • split_fraction – fraction to split at.
  • expected_outcome – a value from ‘ExpectedSplitOutcome’.
Returns:

a tuple that gives the number of items produced by reading the two ranges produced after dynamic splitting. If splitting did not occur, the first value of the tuple will represent the full set of records read by the source while the second value of the tuple will be ‘-1’.

apache_beam.io.source_test_utils.assert_split_at_fraction_binary(source, expected_items, num_items_to_read_before_split, left_fraction, left_result, right_fraction, right_result, stats, start_position=None, stop_position=None)[source]

Performs dynamic work rebalancing for fractions within a given range.

Asserts that given a start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds.

Parameters:
  • source – source to perform dynamic splitting on.
  • expected_items – total set of items expected when reading the source.
  • num_items_to_read_before_split – number of items to read before splitting.
  • left_fraction – left fraction for binary splitting.
  • left_result – result received by splitting at left fraction.
  • right_fraction – right fraction for binary splitting.
  • right_result – result received by splitting at right fraction.
  • stats – a SplitFractionStatistics for storing results.
apache_beam.io.source_test_utils.assert_split_at_fraction_exhaustive(source, start_position=None, stop_position=None, perform_multi_threaded_test=True)[source]

Performs and tests dynamic work rebalancing exhaustively.

Asserts that for each possible start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds. Verifies multi threaded splitting as well.

Parameters:
  • source – the source to perform dynamic splitting on.
  • perform_multi_threaded_test – if true performs a multi-threaded test otherwise this test is skipped.
Raises:

ValueError – if the exhaustive splitting test fails.

apache_beam.io.source_test_utils.assert_split_at_fraction_fails(source, num_items_to_read_before_split, split_fraction)[source]

Asserts that dynamic work rebalancing at a given fraction fails.

Asserts that trying to perform dynamic splitting after reading ‘num_items_to_read_before_split’ items from the source fails.

Parameters:
  • source – source to perform dynamic splitting on.
  • num_items_to_read_before_split – number of items to read before splitting.
  • split_fraction – fraction to split at.
apache_beam.io.source_test_utils.assert_split_at_fraction_succeeds_and_consistent(source, num_items_to_read_before_split, split_fraction)[source]

Verifies some consistency properties of dynamic work rebalancing.

Equivalent to the following pseudocode::

original_range_tracker = source.getRangeTracker(None, None)
original_reader = source.read(original_range_tracker)
items_before_split = read N items from original_reader
suggested_split_position = original_range_tracker.position_for_fraction(
  split_fraction)
original_stop_position - original_range_tracker.stop_position()
split_result = range_tracker.try_split()
split_position, split_fraction = split_result
primary_range_tracker = source.get_range_tracker(
  original_range_tracker.start_position(), split_position)
residual_range_tracker = source.get_range_tracker(split_position,
  original_stop_position)

assert that: items when reading source.read(primary_range_tracker) ==
  items_before_split + items from continuing to read 'original_reader'
assert that: items when reading source.read(original_range_tracker) =
  items when reading source.read(primary_range_tracker) + items when reading
source.read(residual_range_tracker)
Parameters:
  • source – source to perform dynamic work rebalancing on.
  • num_items_to_read_before_split – number of items to read before splitting.
  • split_fraction – fraction to split at.

apache_beam.io.textio module

A source and a sink for reading from and writing to text files.

class apache_beam.io.textio.ReadFromText(file_pattern=None, min_bundle_size=0, compression_type='auto', strip_trailing_newlines=True, coder=StrUtf8Coder, validate=True, skip_header_lines=0, **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for reading text files.

Parses a text file as newline-delimited elements, by default assuming UTF-8 encoding. Supports newline delimiters ‘n’ and ‘rn’.

This implementation only supports reading text encoded using UTF-8 or ASCII. This does not support other encodings such as UTF-16 or UTF-32.

expand(pvalue)[source]
class apache_beam.io.textio.WriteToText(file_path_prefix, file_name_suffix='', append_trailing_newlines=True, num_shards=0, shard_name_template=None, coder=ToStringCoder, compression_type='auto', header=None)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for writing to text files.

expand(pcoll)[source]

apache_beam.io.tfrecordio module

TFRecord sources and sinks.

class apache_beam.io.tfrecordio.ReadFromTFRecord(file_pattern, coder=BytesCoder, compression_type='auto', validate=True, **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

Transform for reading TFRecord sources.

expand(pvalue)[source]
class apache_beam.io.tfrecordio.WriteToTFRecord(file_path_prefix, coder=BytesCoder, file_name_suffix='', num_shards=0, shard_name_template=None, compression_type='auto', **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

Transform for writing to TFRecord sinks.

expand(pcoll)[source]

Module contents

A package defining several input sources and output sinks.