Developing I/O connectors for Java
IMPORTANT: Use Splittable DoFn
to develop your new I/O. For more details, read the
new I/O connector overview.
To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, the implementation of your custom I/O depends on your use case. Before you start, read the new I/O connector overview for an overview of developing a new I/O connector, the available implementation options, and how to choose the right option for your use case.
This guide covers using the Source
and FileBasedSink
interfaces using Java.
The Python SDK offers the same functionality, but uses a slightly different API.
See Developing I/O connectors for Python
for information specific to the Python SDK.
Basic code requirements
Beam runners use the classes you provide to read and/or write data using
multiple worker instances in parallel. As such, the code you provide for
Source
and FileBasedSink
subclasses must meet some basic requirements:
Serializability: Your
Source
orFileBasedSink
subclass, whether bounded or unbounded, must be Serializable. A runner might create multiple instances of yourSource
orFileBasedSink
subclass to be sent to multiple remote workers to facilitate reading or writing in parallel.Immutability: Your
Source
orFileBasedSink
subclass must be effectively immutable. All private fields must be declared final, and all private variables of collection type must be effectively immutable. If your class has setter methods, those methods must return an independent copy of the object with the relevant field modified.You should only use mutable state in your
Source
orFileBasedSink
subclass if you are using lazy evaluation of expensive computations that you need to implement the source or sink; in that case, you must declare all mutable instance variables transient.Thread-Safety: Your code must be thread-safe. If you build your source to work with dynamic work rebalancing, it is critical that you make your code thread-safe. The Beam SDK provides a helper class to make this easier. See Using Your BoundedSource with dynamic work rebalancing for more details.
Testability: It is critical to exhaustively unit test all of your
Source
andFileBasedSink
subclasses, especially if you build your classes to work with advanced features such as dynamic work rebalancing. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.To assist in testing
BoundedSource
implementations, you can use the SourceTestUtils class.SourceTestUtils
contains utilities for automatically verifying some of the properties of yourBoundedSource
implementation. You can useSourceTestUtils
to increase your implementation’s test coverage using a wide range of inputs with relatively few lines of code. For examples that useSourceTestUtils
, see the AvroSourceTest and TextIOReadTest source code.
In addition, see the PTransform style guide for Beam’s transform style guidance.
Implementing the Source interface
To create a data source for your pipeline, you must provide the format-specific logic that tells a runner how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you’re creating a data source that reads unbounded data, you must provide additional logic for managing your source’s watermark and optional checkpointing.
Supply the logic for your source by creating the following classes:
A subclass of
BoundedSource
if you want to read a finite (batch) data set, or a subclass ofUnboundedSource
if you want to read an infinite (streaming) data set. These subclasses describe the data you want to read, including the data’s location and parameters (such as how much data to read).A subclass of
Source.Reader
. Each Source must have an associated Reader that captures all the state involved in reading from thatSource
. This can include things like file handles, RPC connections, and other parameters that depend on the specific requirements of the data format you want to read.The
Reader
class hierarchy mirrors the Source hierarchy. If you’re extendingBoundedSource
, you’ll need to provide an associatedBoundedReader
. if you’re extendingUnboundedSource
, you’ll need to provide an associatedUnboundedReader
.One or more user-facing wrapper composite transforms (
PTransform
) that wrap read operations. PTransform wrappers discusses why you should avoid exposing your sources.
Implementing the Source subclass
You must create a subclass of either BoundedSource
or UnboundedSource
,
depending on whether your data is a finite batch or an infinite stream. In
either case, your Source
subclass must override the abstract methods in the
superclass. A runner might call these methods when using your data source. For
example, when reading from a bounded source, a runner uses these methods to
estimate the size of your data set and to split it up for parallel reading.
Your Source
subclass should also manage basic information about your data
source, such as the location. For example, the example Source
implementation
in Beam’s DatastoreIO
class takes host, datasetID, and query as arguments. The connector uses these
values to obtain data from Cloud Datastore.
BoundedSource
BoundedSource
represents a finite data set from which a Beam runner may read,
possibly in parallel. BoundedSource
contains a set of abstract methods that
the runner uses to split the data set for reading by multiple workers.
To implement a BoundedSource
, your subclass must override the following
abstract methods:
split
: The runner uses this method to split your finite data into bundles of a given size.getEstimatedSizeBytes
: The runner uses this method to estimate the total size of your data, in bytes.createReader
: Creates the associatedBoundedReader
for thisBoundedSource
.
You can see a model of how to implement BoundedSource
and the required
abstract methods in Beam’s implementations for Cloud BigTable
(BigtableIO.java)
and BigQuery (BigQuerySourceBase.java).
UnboundedSource
UnboundedSource
represents an infinite data stream from which the runner may
read, possibly in parallel. UnboundedSource
contains a set of abstract methods
that the runner uses to support streaming reads in parallel; these include
checkpointing for failure recovery, record IDs to prevent data duplication,
and watermarking for estimating data completeness in downstream parts of your
pipeline.
To implement an UnboundedSource
, your subclass must override the following
abstract methods:
split
: The runner uses this method to generate a list ofUnboundedSource
objects which represent the number of sub-stream instances from which the service should read in parallel.getCheckpointMarkCoder
: The runner uses this method to obtain the Coder for the checkpoints for your source (if any).requiresDeduping
: The runner uses this method to determine whether the data requires explicit removal of duplicate records. If this method returns true, the runner will automatically insert a step to remove duplicates from your source’s output. This should return true if and only if your source provides record IDs for each record. SeeUnboundedReader.getCurrentRecordId
for when this should be done.createReader
: Creates the associatedUnboundedReader
for thisUnboundedSource
.
Implementing the Reader subclass
You must create a subclass of either BoundedReader
or UnboundedReader
to be
returned by your source subclass’s createReader
method. The runner uses the
methods in your Reader
(whether bounded or unbounded) to do the actual reading
of your dataset.
BoundedReader
and UnboundedReader
have similar basic interfaces, which
you’ll need to define. In addition, there are some additional methods unique to
UnboundedReader
that you’ll need to implement for working with unbounded data,
and an optional method you can implement if you want your BoundedReader
to
take advantage of dynamic work rebalancing. There are also minor differences in
the semantics for the start()
and advance()
methods when using
UnboundedReader
.
Reader methods common to both BoundedReader and UnboundedReader
A runner uses the following methods to read data using BoundedReader
or
UnboundedReader
:
start
: Initializes theReader
and advances to the first record to be read. This method is called exactly once when the runner begins reading your data, and is a good place to put expensive operations needed for initialization.advance
: Advances the reader to the next valid record. This method must return false if there is no more input available.BoundedReader
should stop reading once advance returns false, butUnboundedReader
can return true in future calls once more data is available from your stream.getCurrent
: Returns the data record at the current position, last read by start or advance.getCurrentTimestamp
: Returns the timestamp for the current data record. You only need to overridegetCurrentTimestamp
if your source reads data that has intrinsic timestamps. The runner uses this value to set the intrinsic timestamp for each element in the resulting outputPCollection
.
Reader methods unique to UnboundedReader
In addition to the basic Reader
interface, UnboundedReader
has some
additional methods for managing reads from an unbounded data source:
getCurrentRecordId
: Returns a unique identifier for the current record. The runner uses these record IDs to filter out duplicate records. If your data has logical IDs present in each record, you can have this method return them; otherwise, you can return a hash of the record contents, using at least a 128-bit hash. It is incorrect to use Java’sObject.hashCode()
, as a 32-bit hash is generally insufficient for preventing collisions, andhasCode()
is not guaranteed to be stable across processes.Implementing
getCurrentRecordId
is optional if your source uses a checkpointing scheme that uniquely identifies each record. For example, if your splits are files and the checkpoints are file positions up to which all data has been read, you do not need record IDs. However, record IDs can still be useful if upstream systems writing data to your source occasionally produce duplicate records that your source might then read.getWatermark
: Returns a watermark that yourReader
provides. The watermark is the approximate lower bound on timestamps of future elements to be read by yourReader
. The runner uses the watermark as an estimate of data completeness. Watermarks are used in windowing and triggers.getCheckpointMark
: The runner uses this method to create a checkpoint in your data stream. The checkpoint represents the progress of theUnboundedReader
, which can be used for failure recovery. Different data streams may use different checkpointing methods; some sources might require received records to be acknowledged, while others might use positional checkpointing. You’ll need to tailor this method to the most appropriate checkpointing scheme. For example, you might have this method return the most recently acked record(s).getCheckpointMark
is optional; you don’t need to implement it if your data does not have meaningful checkpoints. However, if you choose not to implement checkpointing in your source, you may encounter duplicate data or data loss in your pipeline, depending on whether your data source tries to re-send records in case of errors.
You can read a bounded PCollection
from an UnboundedSource
by specifying
either .withMaxNumRecords
or .withMaxReadTime
when you read from your
source. .withMaxNumRecords
reads a fixed maximum number of records from your
unbounded source, while .withMaxReadTime
reads from your unbounded source for
a fixed maximum time duration.
Using your BoundedSource with dynamic work rebalancing
If your source provides bounded data, you can have your BoundedReader
work
with dynamic work rebalancing by implementing the method splitAtFraction
. The
runner may call splitAtFraction
concurrently with start or advance on a given
reader so that the remaining data in your Source
can be split and
redistributed to other workers.
When you implement splitAtFraction
, your code must produce a
mutually-exclusive set of splits where the union of those splits matches the
total data set.
If you implement splitAtFraction
, you must implement both splitAtFraction
and getFractionConsumed
in a thread-safe manner, or data loss is possible. You
should also unit-test your implementation exhaustively to avoid data duplication
or data loss.
To ensure that your code is thread-safe, use the RangeTracker
thread-safe
helper object to manage positions in your data source when implementing
splitAtFraction
and getFractionConsumed
.
We highly recommended that you unit test your implementations of
splitAtFraction
using the SourceTestUtils
class. SourceTestUtils
contains
a number of methods for testing your implementation of splitAtFraction
,
including exhaustive automatic testing.
Convenience Source and Reader base classes
The Beam SDK contains some convenient abstract base classes to help you create
Source
and Reader
classes that work with common data storage formats, like
files.
FileBasedSource
If your data source uses files, you can derive your Source
and Reader
classes from the FileBasedSource
and FileBasedReader
abstract base classes.
FileBasedSource
is a bounded source subclass that implements code common to
Beam sources that interact with files, including:
- File pattern expansion
- Sequential record reading
- Split points
Using the FileBasedSink abstraction
If your data source uses files, you can implement the FileBasedSink
abstraction to create a file-based sink. For other sinks, use ParDo
,
GroupByKey
, and other transforms offered by the Beam SDK for Java. See the
developing I/O connectors overview
for more details.
When using the FileBasedSink
interface, you must provide the format-specific
logic that tells the runner how to write bounded data from your pipeline’s
PCollection
s to an output sink. The runner writes bundles of data in parallel
using multiple workers.
Supply the logic for your file-based sink by implementing the following classes:
A subclass of the abstract base class
FileBasedSink
.FileBasedSink
describes a location or resource that your pipeline can write to in parallel. To avoid exposing your sink to end-users, yourFileBasedSink
subclass should be protected or private.A user-facing wrapper
PTransform
that, as part of the logic, calls WriteFiles and passes yourFileBasedSink
as a parameter. A user should not need to callWriteFiles
directly.
The FileBasedSink
abstract base class implements code that is common to Beam
sinks that interact with files, including:
- Setting file headers and footers
- Sequential record writing
- Setting the output MIME type
FileBasedSink
and its subclasses support writing files to any Beam-supported
FileSystem
implementations. See the following Beam-provided FileBasedSink
implementations for examples:
PTransform wrappers
When you create a source or sink that end-users will use, avoid exposing your
source or sink code. To avoid exposing your sources and sinks to end-users, your
new classes should be protected or private. Then, implement a user-facing
wrapper PTransform
. By exposing your source or sink as a transform, your
implementation is hidden and can be arbitrarily complex or simple. The greatest
benefit of not exposing implementation details is that later on, you can add
additional functionality without breaking the existing implementation for users.
For example, if your users’ pipelines read from your source using
read
and you want to insert a reshard into the pipeline, all
users would need to add the reshard themselves (using the GroupByKey
transform). To solve this, we recommended that you expose the source as a
composite PTransform
that performs both the read operation and the reshard.
See Beam’s PTransform style guide
for additional information about wrapping with a PTransform
.
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!