Developing I/O connectors for Python

IMPORTANT: Please use Splittable DoFn to develop your new I/O. For more details, please read the new I/O connector overview.

To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, the implementation of your custom I/O depends on your use case. Before you start, read the new I/O connector overview for an overview of developing a new I/O connector, the available implementation options, and how to choose the right option for your use case.

This guide covers using the Source and FileBasedSink interfaces for Python. The Java SDK offers the same functionality, but uses a slightly different API. See Developing I/O connectors for Java for information specific to the Java SDK.

Basic code requirements

Beam runners use the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for Source and FileBasedSink subclasses must meet some basic requirements:

  1. Serializability: Your Source or FileBasedSink subclass must be serializable. The service may create multiple instances of your Source or FileBasedSink subclass to be sent to multiple remote workers to facilitate reading or writing in parallel. The way the source and sink objects are serialized is runner specific.

  2. Immutability: Your Source or FileBasedSink subclass must be effectively immutable. You should only use mutable state in your Source or FileBasedSink subclass if you are using lazy evaluation of expensive computations that you need to implement the source.

  3. Thread-Safety: Your code must be thread-safe. The Beam SDK for Python provides the RangeTracker class to make this easier.

  4. Testability: It is critical to exhaustively unit-test all of your Source and FileBasedSink subclasses. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect. You can use test harnesses and utility methods available in the source_test_utils module to develop tests for your source.

In addition, see the PTransform style guide for Beam’s transform style guidance.

Implementing the Source interface

To create a new data source for your pipeline, you’ll need to provide the format-specific logic that tells the service how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel.

Supply the logic for your new source by creating the following classes:

You can find these classes in the apache_beam.io.iobase module.

Implementing the BoundedSource subclass

BoundedSource represents a finite data set from which the service reads, possibly in parallel. BoundedSource contains a set of methods that the service uses to split the data set for reading by multiple remote workers.

To implement a BoundedSource, your subclass must override the following methods:

Implementing the RangeTracker subclass

A RangeTracker is a thread-safe object used to manage the current range and current position of the reader of a BoundedSource and protect concurrent access to them.

To implement a RangeTracker, you should first familiarize yourself with the following definitions:

RangeTracker methods

To implement a RangeTracker, your subclass must override the following methods:

This method splits the current range [self.start_position, self.stop_position) into a “primary” part [self.start_position, split_position), and a “residual” part [split_position, self.stop_position), assuming that split_position has not been consumed yet.

If split_position has already been consumed, the method returns None. Otherwise, it updates the current range to be the primary and returns a tuple (split_position, split_fraction). split_fraction should be the fraction of size of range [self.start_position, split_position) compared to the original (before split) range [self.start_position, self.stop_position).

Note: Methods of class iobase.RangeTracker may be invoked by multiple threads, hence this class must be made thread-safe, for example, by using a single lock object.

Convenience Source base classes

The Beam SDK for Python contains some convenient abstract base classes to help you easily create new sources.

FileBasedSource

FileBasedSource is a framework for developing sources for new file types. You can derive your BoundedSource class from the FileBasedSource class.

To create a source for a new file type, you need to create a sub-class of FileBasedSource. Sub-classes of FileBasedSource must implement the method FileBasedSource.read_records().

See AvroSource for an example implementation of FileBasedSource.

Reading from a new Source

The following example, CountingSource, demonstrates an implementation of BoundedSource and uses the SDK-provided RangeTracker called OffsetRangeTracker.

class CountingSource(iobase.BoundedSource):
  def __init__(self, count):
    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
    self._count = count

  def estimate_size(self):
    return self._count

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    return OffsetRangeTracker(start_position, stop_position)

  def read(self, range_tracker):
    for i in range(range_tracker.start_position(),
                   range_tracker.stop_position()):
      if not range_tracker.try_claim(i):
        return
      self.records_read.inc()
      yield i

  def split(self, desired_bundle_size, start_position=None, stop_position=None):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    bundle_start = start_position
    while bundle_start < stop_position:
      bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
      yield iobase.SourceBundle(
          weight=(bundle_stop - bundle_start),
          source=self,
          start_position=bundle_start,
          stop_position=bundle_stop)
      bundle_start = bundle_stop

To read data from the source in your pipeline, use the Read transform:

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))

Note: When you create a source that end-users are going to use, we recommended that you do not expose the code for the source itself as demonstrated in the example above. Use a wrapping PTransform instead. PTransform wrappers discusses why you should avoid exposing your sources, and walks through how to create a wrapper.

Using the FileBasedSink abstraction

If your data source uses files, you can implement the FileBasedSink abstraction to create a file-based sink. For other sinks, use ParDo, GroupByKey, and other transforms offered by the Beam SDK for Python. See the developing I/O connectors overview for more details.

When using the FileBasedSink interface, you must provide the format-specific logic that tells the runner how to write bounded data from your pipeline’s PCollections to an output sink. The runner writes bundles of data in parallel using multiple workers.

Supply the logic for your file-based sink by implementing the following classes:

The FileBasedSink abstract base class implements code that is common to Beam sinks that interact with files, including:

FileBasedSink and its subclasses support writing files to any Beam-supported FileSystem implementations. See the following Beam-provided FileBasedSink implementation for an example:

PTransform wrappers

When you create a source or sink that end-users will use, avoid exposing your source or sink code. To avoid exposing your sources and sinks to end-users, your new classes should use the _ prefix. Then, implement a user-facing wrapper PTransform.`By exposing your source or sink as a transform, your implementation is hidden and can be arbitrarily complex or simple. The greatest benefit of not exposing implementation details is that later on, you can add additional functionality without breaking the existing implementation for users.

For example, if your users’ pipelines read from your source using beam.io.Read and you want to insert a reshard into the pipeline, all users would need to add the reshard themselves (using the GroupByKey transform). To solve this, we recommended that you expose the source as a composite PTransform that performs both the read operation and the reshard.

See Beam’s PTransform style guide for additional information about wrapping with a PTransform.

The following examples change the source and sink from the above sections so that they are not exposed to end-users. For the source, rename CountingSource to _CountingSource. Then, create the wrapper PTransform, called ReadFromCountingSource:

class ReadFromCountingSource(PTransform):
  def __init__(self, count):
    super().__init__()
    self._count = count

  def expand(self, pcoll):
    return pcoll | iobase.Read(_CountingSource(self._count))

Finally, read from the source:

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)

For the sink, rename SimpleKVSink to _SimpleKVSink. Then, create the wrapper PTransform, called WriteToKVSink:

class WriteToKVSink(PTransform):
  def __init__(self, simplekv, url, final_table_name):
    self._simplekv = simplekv
    super().__init__()
    self._url = url
    self._final_table_name = final_table_name

  def expand(self, pcoll):
    return pcoll | iobase.Write(
        _SimpleKVSink(self._simplekv, self._url, self._final_table_name))

Finally, write to the sink:

with beam.Pipeline(options=PipelineOptions()) as pipeline:
  kvs = pipeline | 'CreateKVs' >> beam.core.Create(KVs)
  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
      simplekv, 'http://url_to_simple_kv/', final_table_name)