apache_beam.io.fileio module

PTransforms for manipulating files in Apache Beam.

Provides reading PTransforms, MatchFiles, MatchAll, that produces a PCollection of records representing a file and its metadata; and ReadMatches, which takes in a PCollection of file metadata records, and produces a PCollection of ReadableFile objects. These transforms currently do not support splitting by themselves.

Writing to Files

The transforms in this file include WriteToFiles, which allows you to write a beam.PCollection to files, and gives you many options to customize how to do this.

The WriteToFiles transform supports bounded and unbounded PCollections (i.e. it can be used both batch and streaming pipelines). For streaming pipelines, it currently does not have support for multiple trigger firings on the same window.

File Naming

One of the parameters received by WriteToFiles is a function specifying how to name the files that are written. This is a function that takes in the following parameters:

  • window
  • pane
  • shard_index
  • total_shards
  • compression
  • destination

It should return a file name that is unique for a combination of these parameters.

The default naming strategy is to name files in the format $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix, where:

  • $prefix is, by default, “output”.
  • $start and $end are the boundaries of the window for the data being written. These are omitted if we’re using the Global window.
  • $pane is the index for the number of firing for a window.
  • $shard and $numShards are the current shard number, and the total number of shards for this window firing.
  • $suffix is, by default, an empty string, but it can be set by the user via default_file_naming.

Dynamic Destinations

If the elements in the input beam.PCollection can be partitioned into groups that should be treated differently (e.g. some events are to be stored as CSV, while some others are to be stored as Avro files), it is possible to do this by passing a destination parameter to WriteToFiles. Something like the following:

my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

In this transform, depending on the type of a record, it will be written down to a destination named ‘avro’, or ‘csv’. The value returned by the destination call is then passed to the sink call, to determine what sort of sink will be used for each destination. The return type of the destination parameter can be anything, as long as elements can be grouped by it.

class apache_beam.io.fileio.EmptyMatchTreatment[source]

Bases: object

How to treat empty matches in MatchAll and MatchFiles transforms.

If empty matches are disallowed, an error will be thrown if a pattern does not match any files.

ALLOW = 'ALLOW'
DISALLOW = 'DISALLOW'
ALLOW_IF_WILDCARD = 'ALLOW_IF_WILDCARD'
static allow_empty_match(pattern, setting)[source]
class apache_beam.io.fileio.MatchFiles(file_pattern: str, empty_match_treatment='ALLOW_IF_WILDCARD')[source]

Bases: apache_beam.transforms.ptransform.PTransform

Matches a file pattern using FileSystems.match.

This PTransform returns a PCollection of matching files in the form of FileMetadata objects.

expand(pcoll) → apache_beam.pvalue.PCollection[apache_beam.io.filesystem.FileMetadata][apache_beam.io.filesystem.FileMetadata][source]
class apache_beam.io.fileio.MatchAll(empty_match_treatment='ALLOW')[source]

Bases: apache_beam.transforms.ptransform.PTransform

Matches file patterns from the input PCollection via FileSystems.match.

This PTransform returns a PCollection of matching files in the form of FileMetadata objects.

expand(pcoll: apache_beam.pvalue.PCollection) → apache_beam.pvalue.PCollection[apache_beam.io.filesystem.FileMetadata][apache_beam.io.filesystem.FileMetadata][source]
class apache_beam.io.fileio.ReadableFile(metadata, compression=None)[source]

Bases: object

A utility class for accessing files.

open(mime_type='text/plain', compression_type=None)[source]
read(mime_type='application/octet-stream')[source]
read_utf8()[source]
class apache_beam.io.fileio.MatchContinuously(file_pattern, interval=360.0, has_deduplication=True, start_timestamp=Timestamp(1721920959.767408), stop_timestamp=Timestamp(9223372036854.775000), match_updated_files=False, apply_windowing=False, empty_match_treatment='ALLOW')[source]

Bases: apache_beam.transforms.ptransform.PTransform

Checks for new files for a given pattern every interval.

This PTransform returns a PCollection of matching files in the form of FileMetadata objects.

MatchContinuously is experimental. No backwards-compatibility guarantees.

Matching continuously scales poorly, as it is stateful, and requires storing file ids in memory. In addition, because it is memory-only, if a pipeline is restarted, already processed files will be reprocessed. Consider an alternate technique, such as Pub/Sub Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) when using GCS if possible.

Initializes a MatchContinuously transform.

Parameters:
  • file_pattern – The file path to read from.
  • interval – Interval at which to check for files in seconds.
  • has_deduplication – Whether files already read are discarded or not.
  • start_timestamp – Timestamp for start file checking.
  • stop_timestamp – Timestamp after which no more files will be checked.
  • match_updated_files – (When has_deduplication is set to True) whether match file with timestamp changes.
  • apply_windowing – Whether each element should be assigned to individual window. If false, all elements will reside in global window.
expand(pbegin) → apache_beam.pvalue.PCollection[apache_beam.io.filesystem.FileMetadata][apache_beam.io.filesystem.FileMetadata][source]
class apache_beam.io.fileio.ReadMatches(compression=None, skip_directories=True)[source]

Bases: apache_beam.transforms.ptransform.PTransform

Converts each result of MatchFiles() or MatchAll() to a ReadableFile.

This helps read in a file’s contents or obtain a file descriptor.

expand(pcoll: apache_beam.pvalue.PCollection[typing.Union[str, apache_beam.io.filesystem.FileMetadata]][Union[str, apache_beam.io.filesystem.FileMetadata]]) → apache_beam.pvalue.PCollection[apache_beam.io.fileio.ReadableFile][apache_beam.io.fileio.ReadableFile][source]
class apache_beam.io.fileio.WriteToFiles(path, file_naming=None, destination=None, temp_directory=None, sink=None, shards=None, output_fn=None, max_writers_per_bundle=20)[source]

Bases: apache_beam.transforms.ptransform.PTransform

Write the incoming PCollection to a set of output files.

The incoming PCollection may be bounded or unbounded.

Note: For unbounded PCollections, this transform does not support multiple firings per Window (due to the fact that files are named only by their destination, and window, at the moment).

Initializes a WriteToFiles transform.

Parameters:
  • path (str, ValueProvider) – The directory to write files into.
  • file_naming (callable) – A callable that takes in a window, pane, shard_index, total_shards and compression; and returns a file name.
  • destination (callable) – If this argument is provided, the sink parameter must also be a callable.
  • temp_directory (str, ValueProvider) – To ensure atomicity in the transform, the output is written into temporary files, which are written to a directory that is meant to be temporary as well. Once the whole output has been written, the files are moved into their final destination, and given their final names. By default, the temporary directory will be within the temp_location of your pipeline.
  • sink (callable, FileSink) – The sink to use to write into a file. It should implement the methods of a FileSink. Pass a class signature or an instance of FileSink to this parameter. If none is provided, a TextSink is used.
  • shards (int) – The number of shards per destination and trigger firing.
  • max_writers_per_bundle (int) – The number of writers that can be open concurrently in a single worker that’s processing one bundle.
MAX_NUM_WRITERS_PER_BUNDLE = 20
DEFAULT_SHARDING = 5
expand(pcoll)[source]