Developing I/O connectors for Java
To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, the implementation of your custom I/O depends on your use case. Before you start, read the new I/O connector overview for an overview of developing a new I/O connector, the available implementation options, and how to choose the right option for your use case.
This guide covers using the
FileBasedSink interfaces using Java.
The Python SDK offers the same functionality, but uses a slightly different API.
See Developing I/O connectors for Python
for information specific to the Python SDK.
Basic code requirements
Beam runners use the classes you provide to read and/or write data using
multiple worker instances in parallel. As such, the code you provide for
FileBasedSink subclasses must meet some basic requirements:
FileBasedSinksubclass, whether bounded or unbounded, must be Serializable. A runner might create multiple instances of your
FileBasedSinksubclass to be sent to multiple remote workers to facilitate reading or writing in parallel.
FileBasedSinksubclass must be effectively immutable. All private fields must be declared final, and all private variables of collection type must be effectively immutable. If your class has setter methods, those methods must return an independent copy of the object with the relevant field modified.
You should only use mutable state in your
FileBasedSinksubclass if you are using lazy evaluation of expensive computations that you need to implement the source or sink; in that case, you must declare all mutable instance variables transient.
Thread-Safety: Your code must be thread-safe. If you build your source to work with dynamic work rebalancing, it is critical that you make your code thread-safe. The Beam SDK provides a helper class to make this easier. See Using Your BoundedSource with dynamic work rebalancing for more details.
Testability: It is critical to exhaustively unit test all of your
FileBasedSinksubclasses, especially if you build your classes to work with advanced features such as dynamic work rebalancing. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.
To assist in testing
BoundedSourceimplementations, you can use the SourceTestUtils class.
SourceTestUtilscontains utilities for automatically verifying some of the properties of your
BoundedSourceimplementation. You can use
SourceTestUtilsto increase your implementation’s test coverage using a wide range of inputs with relatively few lines of code. For examples that use
SourceTestUtils, see the AvroSourceTest and TextIOReadTest source code.
In addition, see the PTransform style guide for Beam’s transform style guidance.
Implementing the Source interface
To create a data source for your pipeline, you must provide the format-specific logic that tells a runner how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you’re creating a data source that reads unbounded data, you must provide additional logic for managing your source’s watermark and optional checkpointing.
Supply the logic for your source by creating the following classes:
A subclass of
BoundedSourceif you want to read a finite (batch) data set, or a subclass of
UnboundedSourceif you want to read an infinite (streaming) data set. These subclasses describe the data you want to read, including the data’s location and parameters (such as how much data to read).
A subclass of
Source.Reader. Each Source must have an associated Reader that captures all the state involved in reading from that
Source. This can include things like file handles, RPC connections, and other parameters that depend on the specific requirements of the data format you want to read.
Readerclass hierarchy mirrors the Source hierarchy. If you’re extending
BoundedSource, you’ll need to provide an associated
BoundedReader. if you’re extending
UnboundedSource, you’ll need to provide an associated
One or more user-facing wrapper composite transforms (
PTransform) that wrap read operations. PTransform wrappers discusses why you should avoid exposing your sources.
Implementing the Source subclass
You must create a subclass of either
depending on whether your data is a finite batch or an infinite stream. In
either case, your
Source subclass must override the abstract methods in the
superclass. A runner might call these methods when using your data source. For
example, when reading from a bounded source, a runner uses these methods to
estimate the size of your data set and to split it up for parallel reading.
Source subclass should also manage basic information about your data
source, such as the location. For example, the example
in Beam’s DatastoreIO
class takes host, datasetID, and query as arguments. The connector uses these
values to obtain data from Cloud Datastore.
BoundedSource represents a finite data set from which a Beam runner may read,
possibly in parallel.
BoundedSource contains a set of abstract methods that
the runner uses to split the data set for reading by multiple workers.
To implement a
BoundedSource, your subclass must override the following
split: The runner uses this method to split your finite data into bundles of a given size.
getEstimatedSizeBytes: The runner uses this method to estimate the total size of your data, in bytes.
createReader: Creates the associated
UnboundedSource represents an infinite data stream from which the runner may
read, possibly in parallel.
UnboundedSource contains a set of abstract methods
that the runner uses to support streaming reads in parallel; these include
checkpointing for failure recovery, record IDs to prevent data duplication,
and watermarking for estimating data completeness in downstream parts of your
To implement an
UnboundedSource, your subclass must override the following
split: The runner uses this method to generate a list of
UnboundedSourceobjects which represent the number of sub-stream instances from which the service should read in parallel.
getCheckpointMarkCoder: The runner uses this method to obtain the Coder for the checkpoints for your source (if any).
requiresDeduping: The runner uses this method to determine whether the data requires explicit removal of duplicate records. If this method returns true, the runner will automatically insert a step to remove duplicates from your source’s output. This should return true if and only if your source provides record IDs for each record. See
UnboundedReader.getCurrentRecordIdfor when this should be done.
createReader: Creates the associated
Implementing the Reader subclass
You must create a subclass of either
UnboundedReader to be
returned by your source subclass’s
createReader method. The runner uses the
methods in your
Reader (whether bounded or unbounded) to do the actual reading
of your dataset.
UnboundedReader have similar basic interfaces, which
you’ll need to define. In addition, there are some additional methods unique to
UnboundedReader that you’ll need to implement for working with unbounded data,
and an optional method you can implement if you want your
take advantage of dynamic work rebalancing. There are also minor differences in
the semantics for the
advance() methods when using
Reader methods common to both BoundedReader and UnboundedReader
A runner uses the following methods to read data using
start: Initializes the
Readerand advances to the first record to be read. This method is called exactly once when the runner begins reading your data, and is a good place to put expensive operations needed for initialization.
advance: Advances the reader to the next valid record. This method must return false if there is no more input available.
BoundedReadershould stop reading once advance returns false, but
UnboundedReadercan return true in future calls once more data is available from your stream.
getCurrent: Returns the data record at the current position, last read by start or advance.
getCurrentTimestamp: Returns the timestamp for the current data record. You only need to override
getCurrentTimestampif your source reads data that has intrinsic timestamps. The runner uses this value to set the intrinsic timestamp for each element in the resulting output
Reader methods unique to UnboundedReader
In addition to the basic
UnboundedReader has some
additional methods for managing reads from an unbounded data source:
getCurrentRecordId: Returns a unique identifier for the current record. The runner uses these record IDs to filter out duplicate records. If your data has logical IDs present in each record, you can have this method return them; otherwise, you can return a hash of the record contents, using at least a 128-bit hash. It is incorrect to use Java’s
Object.hashCode(), as a 32-bit hash is generally insufficient for preventing collisions, and
hasCode()is not guaranteed to be stable across processes.
getCurrentRecordIdis optional if your source uses a checkpointing scheme that uniquely identifies each record. For example, if your splits are files and the checkpoints are file positions up to which all data has been read, you do not need record IDs. However, record IDs can still be useful if upstream systems writing data to your source occasionally produce duplicate records that your source might then read.
getWatermark: Returns a watermark that your
Readerprovides. The watermark is the approximate lower bound on timestamps of future elements to be read by your
Reader. The runner uses the watermark as an estimate of data completeness. Watermarks are used in windowing and triggers.
getCheckpointMark: The runner uses this method to create a checkpoint in your data stream. The checkpoint represents the progress of the
UnboundedReader, which can be used for failure recovery. Different data streams may use different checkpointing methods; some sources might require received records to be acknowledged, while others might use positional checkpointing. You’ll need to tailor this method to the most appropriate checkpointing scheme. For example, you might have this method return the most recently acked record(s).
getCheckpointMarkis optional; you don’t need to implement it if your data does not have meaningful checkpoints. However, if you choose not to implement checkpointing in your source, you may encounter duplicate data or data loss in your pipeline, depending on whether your data source tries to re-send records in case of errors.
You can read a bounded
PCollection from an
UnboundedSource by specifying
.withMaxReadTime when you read from your
.withMaxNumRecords reads a fixed maximum number of records from your
unbounded source, while
.withMaxReadTime reads from your unbounded source for
a fixed maximum time duration.
Using your BoundedSource with dynamic work rebalancing
If your source provides bounded data, you can have your
with dynamic work rebalancing by implementing the method
runner may call
splitAtFraction concurrently with start or advance on a given
reader so that the remaining data in your
Source can be split and
redistributed to other workers.
When you implement
splitAtFraction, your code must produce a
mutually-exclusive set of splits where the union of those splits matches the
total data set.
If you implement
splitAtFraction, you must implement both
getFractionConsumed in a thread-safe manner, or data loss is possible. You
should also unit-test your implementation exhaustively to avoid data duplication
or data loss.
To ensure that your code is thread-safe, use the
helper object to manage positions in your data source when implementing
We highly recommended that you unit test your implementations of
splitAtFraction using the
a number of methods for testing your implementation of
including exhaustive automatic testing.
Convenience Source and Reader base classes
The Beam SDK contains some convenient abstract base classes to help you create
Reader classes that work with common data storage formats, like
If your data source uses files, you can derive your
classes from the
FileBasedReader abstract base classes.
FileBasedSource is a bounded source subclass that implements code common to
Beam sources that interact with files, including:
- File pattern expansion
- Sequential record reading
- Split points
Using the FileBasedSink abstraction
If your data source uses files, you can implement the
abstraction to create a file-based sink. For other sinks, use
GroupByKey, and other transforms offered by the Beam SDK for Java. See the
developing I/O connectors overview
for more details.
When using the
FileBasedSink interface, you must provide the format-specific
logic that tells the runner how to write bounded data from your pipeline’s
PCollections to an output sink. The runner writes bundles of data in parallel
using multiple workers.
Supply the logic for your file-based sink by implementing the following classes:
A subclass of the abstract base class
FileBasedSinkdescribes a location or resource that your pipeline can write to in parallel. To avoid exposing your sink to end-users, your
FileBasedSinksubclass should be protected or private.
A user-facing wrapper
PTransformthat, as part of the logic, calls WriteFiles and passes your
FileBasedSinkas a parameter. A user should not need to call
FileBasedSink abstract base class implements code that is common to Beam
sinks that interact with files, including:
- Setting file headers and footers
- Sequential record writing
- Setting the output MIME type
FileBasedSink and its subclasses support writing files to any Beam-supported
FileSystem implementations. See the following Beam-provided
implementations for examples:
When you create a source or sink that end-users will use, avoid exposing your
source or sink code. To avoid exposing your sources and sinks to end-users, your
new classes should be protected or private. Then, implement a user-facing
PTransform. By exposing your source or sink as a transform, your
implementation is hidden and can be arbitrarily complex or simple. The greatest
benefit of not exposing implementation details is that later on, you can add
additional functionality without breaking the existing implementation for users.
For example, if your users’ pipelines read from your source using
read and you want to insert a reshard into the pipeline, all
users would need to add the reshard themselves (using the
transform). To solve this, we recommended that you expose the source as a
PTransform that performs both the read operation and the reshard.
See Beam’s PTransform style guide
for additional information about wrapping with a