Creating New Sources and Sinks with the Python SDK

The Apache Beam SDK for Python provides an extensible API that you can use to create new data sources and sinks. This tutorial shows how to create new sources and sinks using Beam’s Source and Sink API.

Why Create a New Source or Sink

You’ll need to create a new source or sink if you want your pipeline to read data from (or write data to) a storage system for which the Beam SDK for Python does not provide native support.

In simple cases, you may not need to create a new source or sink. For example, if you need to read data from an SQL database using an arbitrary query, none of the advanced Source API features would benefit you. Likewise, if you’d like to write data to a third-party API via a protocol that lacks deduplication support, the Sink API wouldn’t benefit you. In such cases it makes more sense to use a ParDo.

However, if you’d like to use advanced features such as dynamic splitting and size estimation, you should use Beam’s APIs and create a new source or sink.

Basic Code Requirements for New Sources and Sinks

Services use the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for Source and Sink subclasses must meet some basic requirements:

Serializability

Your Source or Sink subclass must be serializable. The service may create multiple instances of your Source or Sink subclass to be sent to multiple remote workers to facilitate reading or writing in parallel. Note that the way the source and sink objects are serialized is runner specific.

Immutability

Your Source or Sink subclass must be effectively immutable. You should only use mutable state in your Source or Sink subclass if you are using lazy evaluation of expensive computations that you need to implement the source.

Thread-Safety

Your code must be thread-safe. The Beam SDK for Python provides the RangeTracker class to make this easier.

Testability

It is critical to exhaustively unit-test all of your Source and Sink subclasses. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.

You can use test harnesses and utility methods available in the source_test_utils module to develop tests for your source.

Creating a New Source

You should create a new source if you’d like to use the advanced features that the Source API provides:

For example, if you’d like to read from a new file format that contains many records per file, or if you’d like to read from a key-value store that supports read operations in sorted key order.

To create a new data source for your pipeline, you’ll need to provide the format-specific logic that tells the service how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel.

You supply the logic for your new source by creating the following classes:

Implementing the BoundedSource Subclass

BoundedSource represents a finite data set from which the service reads, possibly in parallel. BoundedSource contains a set of methods that the service uses to split the data set for reading by multiple remote workers.

To implement a BoundedSource, your subclass must override the following methods:

Implementing the RangeTracker Subclass

A RangeTracker is a thread-safe object used to manage the current range and current position of the reader of a BoundedSource and protect concurrent access to them.

To implement a RangeTracker, you should first familiarize yourself with the following definitions:

RangeTracker Methods

To implement a RangeTracker, your subclass must override the following methods:

This method splits the current range [self.start_position, self.stop_position) into a “primary” part [self.start_position, split_position), and a “residual” part [split_position, self.stop_position), assuming that split_position has not been consumed yet.

If split_position has already been consumed, the method returns None. Otherwise, it updates the current range to be the primary and returns a tuple (split_position, split_fraction). split_fraction should be the fraction of size of range [self.start_position, split_position) compared to the original (before split) range [self.start_position, self.stop_position).

Note: Methods of class iobase.RangeTracker may be invoked by multiple threads, hence this class must be made thread-safe, for example, by using a single lock object.

Convenience Source Base Classes

The Beam SDK for Python contains some convenient abstract base classes to help you easily create new sources.

FileBasedSource

FileBasedSource is a framework for developing sources for new file types. You can derive your BoundedSource class from the FileBasedSource class.

To create a source for a new file type, you need to create a sub-class of FileBasedSource. Sub-classes of FileBasedSource must implement the method FileBasedSource.read_records().

See AvroSource for an example implementation of FileBasedSource.

Reading from a New Source

The following example, CountingSource, demonstrates an implementation of BoundedSource and uses the SDK-provided RangeTracker called OffsetRangeTracker.

class CountingSource(iobase.BoundedSource):

  def __init__(self, count):
    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
    self._count = count

  def estimate_size(self):
    return self._count

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    return OffsetRangeTracker(start_position, stop_position)

  def read(self, range_tracker):
    for i in range(self._count):
      if not range_tracker.try_claim(i):
        return
      self.records_read.inc()
      yield i

  def split(self, desired_bundle_size, start_position=None,
            stop_position=None):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    bundle_start = start_position
    while bundle_start < self._count:
      bundle_stop = max(self._count, bundle_start + desired_bundle_size)
      yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
                                source=self,
                                start_position=bundle_start,
                                stop_position=bundle_stop)
      bundle_start = bundle_stop

To read data from the source in your pipeline, use the Read transform:

with beam.Pipeline(options=PipelineOptions()) as p:
  numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))

Note: When you create a source that end-users are going to use, it’s recommended that you do not expose the code for the source itself as demonstrated in the example above, but rather use a wrapping PTransform instead. See PTransform wrappers to see how and why to avoid exposing your sources.

Creating a New Sink

You should create a new sink if you’d like to use the advanced features that the Sink API provides, such as global initialization and finalization that allow the write operation to appear “atomic” (i.e. either all data is written or none is).

A sink represents a resource that can be written to using the Write transform. A parallel write to a sink consists of three phases:

  1. A sequential initialization phase. For example, creating a temporary output directory.
  2. A parallel write phase where workers write bundles of records.
  3. A sequential finalization phase. For example, merging output files.

For example, if you’d like to write to a new table in a database, you should use the Sink API. In this case, the initializer will create a temporary table, the writer will write rows to it, and the finalizer will rename the table to a final location.

To create a new data sink for your pipeline, you’ll need to provide the format-specific logic that tells the sink how to write bounded data from your pipeline’s PCollections to an output sink. The sink writes bundles of data in parallel using multiple workers.

You supply the writing logic by creating the following classes:

Implementing the Sink Subclass

Your Sink subclass describes the location or resource to which your pipeline writes its output. This might include a file system location, the name of a database table or dataset, etc.

To implement a Sink, your subclass must override the following methods:

Caution: initialize_write and finalize_write are conceptually called once: at the beginning and end of a Write transform. However, when you implement these methods, you must ensure that they are idempotent, as they may be called multiple times on different machines in the case of failure, retry, or for redundancy.

Implementing the Writer Subclass

Your Writer subclass implements the logic for writing a bundle of elements from a PCollection to output location defined in your Sink. Services may instantiate multiple instances of your Writer in different threads on the same worker, so access to any static members or methods must be thread-safe.

To implement a Writer, your subclass must override the following abstract methods:

Handling Bundle IDs

When the service calls Sink.open_writer, it will pass a unique bundle ID for the records to be written. Your Writer must use this bundle ID to ensure that its output does not interfere with that of other Writer instances that might have been created in parallel. This is particularly important as the service may retry write operations multiple times in case of failure.

For example, if your Sink’s output is file-based, your Writer class might use the bundle ID as a filename suffix to ensure that your Writer writes its records to a unique output file not used by other Writers. You can then have your Writer’s close method return that file location as part of the write result.

Convenience Sink and Writer Base Classes

The Beam SDK for Python contains some convenient abstract base classes to help you create Source and Reader classes that work with common data storage formats, like files.

FileSink

If your data source uses files, you can derive your Sink and Writer classes from the FileBasedSink and FileBasedSinkWriter classes, which can be found in the filebasedsink.py module. These classes implement code common sinks that interact with files, including:

Writing to a New Sink

Consider a simple key-value storage that writes a given set of key-value pairs to a set of tables. The following is the key-value storage’s API:

The following code demonstrates how to implement the Sink class for this key-value storage.

class SimpleKVSink(iobase.Sink):

  def __init__(self, url, final_table_name):
    self._url = url
    self._final_table_name = final_table_name

  def initialize_write(self):
    access_token = simplekv.connect(self._url)
    return access_token

  def open_writer(self, access_token, uid):
    table_name = 'table' + uid
    return SimpleKVWriter(access_token, table_name)

  def finalize_write(self, access_token, table_names):
    for i, table_name in enumerate(table_names):
      simplekv.rename_table(
          access_token, table_name, self._final_table_name + str(i))

The following code demonstrates how to implement the Writer class for this key-value storage.

class SimpleKVWriter(iobase.Writer):

  def __init__(self, access_token, table_name):
    self._access_token = access_token
    self._table_name = table_name
    self._table = simplekv.open_table(access_token, table_name)

  def write(self, record):
    key, value = record

    simplekv.write_to_table(self._access_token, self._table, key, value)

  def close(self):
    return self._table_name

The following code demonstrates how to write to the sink using the Write transform.

with beam.Pipeline(options=PipelineOptions()) as p:
  kvs = p | 'CreateKVs' >> beam.Create(KVs)

  kvs | 'WriteToSimpleKV' >> beam.io.Write(
      SimpleKVSink('http://url_to_simple_kv/', final_table_name))

Note: When you create a sink that end-users are going to use, it’s recommended that you do not expose the code for the sink itself as demonstrated in the example above, but rather use a wrapping PTransform instead. See PTransform wrappers to see how and why to avoid exposing your sinks.

PTransform Wrappers

If you create a new source or sink for your own use, such as for learning purposes, you should create them as explained in the sections above and use them as demonstrated in the examples.

However, when you create a source or sink that end-users are going to use, instead of exposing the source or sink itself, you should create a wrapper PTransform. Ideally, a source or sink should be exposed to users simply as “something that can be applied in a pipeline”, which is actually a PTransform. That way, its implementation can be hidden and arbitrarily complex or simple.

The greatest benefit of not exposing the implementation details is that later on you will be able to add additional functionality without breaking the existing implementation for users. For example, if your users’ pipelines read from your source using beam.io.Read(...) and you want to insert a reshard into the pipeline, all of your users would need to add the reshard themselves (using the GroupByKey transform). To solve this, it’s recommended that you expose your source as a composite PTransform that performs both the read operation and the reshard.

To avoid exposing your sources and sinks to end-users, it’s recommended that you use the _ prefix when creating your new source and sink classes. Then, create a wrapper PTransform.

The following examples change the source and sink from the above sections so that they are not exposed to end-users. For the source, rename CountingSource to _CountingSource. Then, create the wrapper PTransform, called ReadFromCountingSource:

class ReadFromCountingSource(PTransform):

  def __init__(self, count, **kwargs):
    super(ReadFromCountingSource, self).__init__(**kwargs)
    self._count = count

  def expand(self, pcoll):
    return pcoll | iobase.Read(_CountingSource(count))

Finally, read from the source:

p = beam.Pipeline(options=PipelineOptions())
numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count)

For the sink, rename SimpleKVSink to _SimpleKVSink. Then, create the wrapper PTransform, called WriteToKVSink:

class WriteToKVSink(PTransform):

  def __init__(self, url, final_table_name, **kwargs):
    super(WriteToKVSink, self).__init__(**kwargs)
    self._url = url
    self._final_table_name = final_table_name

  def expand(self, pcoll):
    return pcoll | iobase.Write(_SimpleKVSink(self._url,
                                              self._final_table_name))

Finally, write to the sink:

with beam.Pipeline(options=PipelineOptions()) as p:
  kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
      'http://url_to_simple_kv/', final_table_name)