Class FileIO
Matching filepatterns
match()
and matchAll()
match filepatterns (respectively either a single
filepattern or a PCollection
thereof) and return the files that match them as PCollections
of MatchResult.Metadata
. Configuration options for them are in
FileIO.MatchConfiguration
and include features such as treatment of filepatterns that don't
match anything and continuous incremental matching of filepatterns (watching for new files).
Example: Watching a single filepattern for new files
This example matches a single filepattern repeatedly every 30 seconds, continuously returns
new matched files as an unbounded PCollection<Metadata>
and stops if no new files appear
for 1 hour.
PCollection<Metadata> matches = p.apply(FileIO.match()
.filepattern("...")
.continuously(
Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
Example: Matching a PCollection of filepatterns arriving from Kafka
This example reads filepatterns from Kafka and matches each one as it arrives, producing again
an unbounded PCollection<Metadata>
, and failing in case the filepattern doesn't match
anything.
PCollection<String> filepatterns = p.apply(KafkaIO.read()...);
PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll()
.withEmptyMatchTreatment(DISALLOW));
Reading files
readMatches()
converts each result of match()
or matchAll()
to a FileIO.ReadableFile
that is convenient for reading a file's contents, optionally decompressing it.
Example: Returning filenames and contents of compressed files matching a filepattern
This example matches a single filepattern and returns KVs
of filenames and their
contents as String
, decompressing each file with GZIP.
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(kvs(strings(), strings()))
.via((ReadableFile f) -> {
try {
return KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException("Failed to read the file", ex);
}
}));
Writing files
write()
and writeDynamic()
write elements from a PCollection
of a given
type to files, using a given FileIO.Sink
to write a set of elements to each file. The collection
can be bounded or unbounded - in either case, writing happens by default per window and pane, and
the amount of data in each window and pane is finite, so a finite number of files ("shards") are
written for each window and pane. There are several aspects to this process:
- How many shards are generated per pane: This is controlled by sharding, using
FileIO.Write.withNumShards(int)
orFileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
. The default is runner-specific, so the number of shards will vary based on runner behavior, though at least 1 shard will always be produced for every non-empty pane. Note that setting a fixed number of shards can hurt performance: it adds an additionalGroupByKey
to the pipeline. However, it is required to set it when writing an unboundedPCollection
due to BEAM-1438 and similar behavior in other runners. - How the shards are named: This is controlled by a
FileIO.Write.FileNaming
: filenames can depend on a variety of inputs, e.g. the window, the pane, total number of shards, the current file's shard index, and compression. Controlling the file naming is described in the section File naming below. - Which elements go into which shard: Elements within a pane get distributed into
different shards created for that pane arbitrarily, though
FileIO.Write
attempts to make shards approximately evenly sized. For more control over which elements go into which files, consider using dynamic destinations (see below). - How a given set of elements is written to a shard: This is controlled by the
FileIO.Sink
, e.g.invalid reference
AvroIO#sink
FileIO.Sink
controls the format of a single file: how to open a file, how to write each element to it, and how to close the file - but it does not control the set of files or which elements go where. Elements are written to a shard in an arbitrary order.FileIO.Write
can additionally compress the generated files usingFileIO.Write.withCompression(org.apache.beam.sdk.io.Compression)
. - How all of the above can be element-dependent: This is controlled by dynamic
destinations. It is possible to have different groups of elements use different
policies for naming files and for configuring the
FileIO.Sink
. See "dynamic destinations" below.
File naming
The names of generated files are produced by a FileIO.Write.FileNaming
. The default naming
strategy is to name files in the format:
$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
, where:
- $prefix is set by
FileIO.Write.withPrefix(java.lang.String)
, the default is "output". - $start and $end are boundaries of the window of data being written, formatted in ISO 8601 format (YYYY-mm-ddTHH:MM:SSZZZ). The window is omitted in case this is the global window.
- $pane is the index of the pane within the window. The pane is omitted in case it is known to be the only pane for this window.
- $shard is the index of the current shard being written, out of the $numShards total shards written for the current pane. Both are formatted using 5 digits (or more if necessary according to $numShards) and zero-padded.
- $suffix is set by
FileIO.Write.withSuffix(java.lang.String)
, the default is empty. - $compressionSuffix is based on the default extension for the chosen
compression type
.
For example: data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz
Alternatively, one can specify a custom naming strategy using FileIO.Write.withNaming(Write.FileNaming)
.
If FileIO.Write.to(java.lang.String)
is specified, then the filenames produced by the FileIO.Write.FileNaming
are resolved relative to that directory.
When using dynamic destinations via writeDynamic()
(see below), specifying a custom
naming strategy is required, using FileIO.Write.withNaming(SerializableFunction)
or FileIO.Write.withNaming(Contextful)
. In those, pass a function that creates a FileIO.Write.FileNaming
for the requested group ("destination"). You can either implement a custom FileIO.Write.FileNaming
, or use FileIO.Write.defaultNaming(java.lang.String, java.lang.String)
to configure the default naming strategy
with a prefix and suffix as per above.
Dynamic destinations
If the elements in the input collection can be partitioned into groups that should be treated
differently, FileIO.Write
supports different treatment per group ("destination"). It can
use different file naming strategies for different groups, and can differently configure the
FileIO.Sink
, e.g. write different elements to Avro files in different directories with different
schemas.
This feature is supported by writeDynamic()
. Use FileIO.Write.by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>)
to specify how too
partition the elements into groups ("destinations"). Then elements will be grouped by
destination, and FileIO.Write.withNaming(Contextful)
and FileIO.Write.via(Contextful)
will be
applied separately within each group, i.e. different groups will be written using the file naming
strategies returned by FileIO.Write.withNaming(Contextful)
and using sinks returned by FileIO.Write.via(Contextful)
for the respective destinations. Note that currently sharding can not be
destination-dependent: every window/pane for every destination will use the same number of shards
specified via FileIO.Write.withNumShards(int)
or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
.
Handling Errors
When using dynamic destinations, or when using a formatting function to format a record for
writing, it's possible for an individual record to be malformed, causing an exception. By
default, these exceptions are propagated to the runner causing the bundle to fail. These are
usually retried, though this depends on the runner. Alternately, these errors can be routed to
another PTransform
by using FileIO.Write.withBadRecordErrorHandler(ErrorHandler)
. The
ErrorHandler is registered with the pipeline (see below). See ErrorHandler
for more
documentation. Of note, this error handling only handles errors related to specific records. It
does not handle errors related to connectivity, authorization, etc. as those should be retried by
the runner.
PCollection<> records = ...;
PTransform<PCollection<BadRecord>,?> alternateSink = ...;
try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
records.apply("Write", FileIO.writeDynamic().otherConfigs()
.withBadRecordErrorHandler(handler));
}
Writing custom types to sinks
Normally, when writing a collection of a custom type using a FileIO.Sink
that takes a
different type (for example, writing a PCollection<Event>
to a text-based
Sink<String>
), one can simply apply a ParDo
or MapElements
to convert the custom
type to the sink's output type.
However, when using dynamic destinations, in many such cases the destination needs to be
extracted from the original type, so such a conversion is not possible. For example, one might
write events of a custom class Event
to a text sink, using the event's "type" as a
destination. In that case, specify an output function in FileIO.Write.via(Contextful, Contextful)
or FileIO.Write.via(Contextful, Sink)
.
Example: Writing CSV files
class CSVSink implements FileIO.Sink<List<String>> {
private String header;
private PrintWriter writer;
public CSVSink(List<String> colNames) {
this.header = Joiner.on(",").join(colNames);
}
public void open(WritableByteChannel channel) throws IOException {
writer = new PrintWriter(Channels.newOutputStream(channel));
writer.println(header);
}
public void write(List<String> element) throws IOException {
writer.println(Joiner.on(",").join(element));
}
public void flush() throws IOException {
writer.flush();
}
}
PCollection<BankTransaction> transactions = ...;
// Convert transactions to strings before writing them to the CSV sink.
transactions.apply(MapElements
.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via(tx -> Arrays.asList(tx.getUser(), tx.getAmount())))
.apply(FileIO.<List<String>>write()
.via(new CSVSink(Arrays.asList("user", "amount")))
.to(".../path/to/")
.withPrefix("transactions")
.withSuffix(".csv"));
Example: Writing CSV files to different directories and with different headers
enum TransactionType {
DEPOSIT,
WITHDRAWAL,
TRANSFER,
...
List<String> getFieldNames();
List<String> getAllFields(BankTransaction tx);
}
PCollection<BankTransaction> transactions = ...;
transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
.by(Transaction::getTypeName)
.via(tx -> tx.getTypeName().toFields(tx), // Convert the data to be written to CSVSink
type -> new CSVSink(type.getFieldNames()))
.to(".../path/to/")
.withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
Implementation ofmatch()
.static class
Implementation ofmatchAll()
.static class
Describes configuration for matching filepatterns, such asEmptyMatchTreatment
and continuous watching for matching files.static final class
A utility class for accessing a potentially compressed file.static class
Implementation ofreadMatches()
.static interface
Specifies how to write elements to individual files inwrite()
andwriteDynamic()
.static class
Implementation ofwrite()
andwriteDynamic()
. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic FileIO.Match
match()
Matches a filepattern usingFileSystems.match(java.util.List<java.lang.String>)
and produces a collection of matched resources (both files and directories) asMatchResult.Metadata
.static FileIO.MatchAll
matchAll()
Likematch()
, but matches each filepattern in a collection of filepatterns.static FileIO.ReadMatches
Converts each result ofmatch()
ormatchAll()
to aFileIO.ReadableFile
which can be used to read the contents of each file, optionally decompressing it.static <InputT> FileIO.Write
<Void, InputT> write()
Writes elements to files using aFileIO.Sink
.static <DestT,
InputT>
FileIO.Write<DestT, InputT> Writes elements to files using aFileIO.Sink
and grouping the elements using "dynamic destinations".
-
Constructor Details
-
FileIO
public FileIO()
-
-
Method Details
-
match
Matches a filepattern usingFileSystems.match(java.util.List<java.lang.String>)
and produces a collection of matched resources (both files and directories) asMatchResult.Metadata
.By default, matches the filepattern once and produces a bounded
PCollection
. To continuously watch the filepattern for new matches, useFileIO.MatchAll.continuously(Duration, TerminationCondition)
- this will produce an unboundedPCollection
.By default, a filepattern matching no resources is treated according to
EmptyMatchTreatment.DISALLOW
. To configure this behavior, useFileIO.Match.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
.Returned
MatchResult.Metadata
are deduplicated by filename. For example, if this transform observes a file with the same name several times with different metadata (e.g. because the file is growing), it will emit the metadata the first time this file is observed, and will ignore future changes to this file. -
matchAll
Likematch()
, but matches each filepattern in a collection of filepatterns.Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple filepatterns, it will be produced multiple times.
By default, a filepattern matching no resources is treated according to
EmptyMatchTreatment.ALLOW_IF_WILDCARD
. To configure this behavior, useFileIO.MatchAll.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
. -
readMatches
Converts each result ofmatch()
ormatchAll()
to aFileIO.ReadableFile
which can be used to read the contents of each file, optionally decompressing it. -
write
Writes elements to files using aFileIO.Sink
. See class-level documentation. -
writeDynamic
Writes elements to files using aFileIO.Sink
and grouping the elements using "dynamic destinations". See class-level documentation.
-