public class FileIO
extends java.lang.Object
match()
and matchAll()
match filepatterns (respectively either a single
filepattern or a PCollection
thereof) and return the files that match them as PCollections
of MatchResult.Metadata
. Configuration options for them are in
FileIO.MatchConfiguration
and include features such as treatment of filepatterns that don't
match anything and continuous incremental matching of filepatterns (watching for new files).
This example matches a single filepattern repeatedly every 30 seconds, continuously returns
new matched files as an unbounded PCollection<Metadata>
and stops if no new files appear
for 1 hour.
PCollection<Metadata> matches = p.apply(FileIO.match()
.filepattern("...")
.continuously(
Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
This example reads filepatterns from Kafka and matches each one as it arrives, producing again
an unbounded PCollection<Metadata>
, and failing in case the filepattern doesn't match
anything.
PCollection<String> filepatterns = p.apply(KafkaIO.read()...);
PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll()
.withEmptyMatchTreatment(DISALLOW));
readMatches()
converts each result of match()
or matchAll()
to a FileIO.ReadableFile
that is convenient for reading a file's contents, optionally decompressing it.
This example matches a single filepattern and returns KVs
of filenames and their
contents as String
, decompressing each file with GZIP.
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(kvs(strings(), strings()))
.via((ReadableFile f) -> {
try {
return KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException("Failed to read the file", ex);
}
}));
write()
and writeDynamic()
write elements from a PCollection
of a given
type to files, using a given FileIO.Sink
to write a set of elements to each file. The collection
can be bounded or unbounded - in either case, writing happens by default per window and pane, and
the amount of data in each window and pane is finite, so a finite number of files ("shards") are
written for each window and pane. There are several aspects to this process:
FileIO.Write.withNumShards(int)
or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
. The default is runner-specific,
so the number of shards will vary based on runner behavior, though at least 1 shard will
always be produced for every non-empty pane. Runner-determined sharding is available for
both bounded and unbounded data; support for unbounded data is limited (BEAM-12040) and depends on the
runners.
FileIO.Write.FileNaming
:
filenames can depend on a variety of inputs, e.g. the window, the pane, total number of
shards, the current file's shard index, and compression. Controlling the file naming is
described in the section File naming below.
FileIO.Write
attempts to
make shards approximately evenly sized. For more control over which elements go into which
files, consider using dynamic destinations (see below).
FileIO.Sink
, e.g. AvroIO.sink(java.lang.Class<ElementT>)
will generate Avro files. The FileIO.Sink
controls the
format of a single file: how to open a file, how to write each element to it, and how to
close the file - but it does not control the set of files or which elements go where.
Elements are written to a shard in an arbitrary order. FileIO.Write
can
additionally compress the generated files using FileIO.Write.withCompression(org.apache.beam.sdk.io.Compression)
.
FileIO.Sink
. See "dynamic destinations"
below.
The names of generated files are produced by a FileIO.Write.FileNaming
. The default naming
strategy is to name files in the format: $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
, where:
FileIO.Write.withPrefix(java.lang.String)
, the default is "output".
FileIO.Write.withSuffix(java.lang.String)
, the default is empty.
compression type
.
For example: data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz
Alternatively, one can specify a custom naming strategy using Write#withNaming(Write.FileNaming)
.
If FileIO.Write.to(java.lang.String)
is specified, then the filenames produced by the FileIO.Write.FileNaming
are resolved relative to that directory.
When using dynamic destinations via writeDynamic()
(see below), specifying a custom
naming strategy is required, using FileIO.Write.withNaming(SerializableFunction)
or FileIO.Write.withNaming(Contextful)
. In those, pass a function that creates a FileIO.Write.FileNaming
for the requested group ("destination"). You can either implement a custom FileIO.Write.FileNaming
, or use FileIO.Write.defaultNaming(java.lang.String, java.lang.String)
to configure the default naming strategy
with a prefix and suffix as per above.
If the elements in the input collection can be partitioned into groups that should be treated
differently, FileIO.Write
supports different treatment per group ("destination"). It can
use different file naming strategies for different groups, and can differently configure the
FileIO.Sink
, e.g. write different elements to Avro files in different directories with different
schemas.
This feature is supported by writeDynamic()
. Use FileIO.Write.by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>)
to specify how to
partition the elements into groups ("destinations"). Then elements will be grouped by
destination, and FileIO.Write.withNaming(Contextful)
and FileIO.Write.via(Contextful)
will be
applied separately within each group, i.e. different groups will be written using the file naming
strategies returned by FileIO.Write.withNaming(Contextful)
and using sinks returned by FileIO.Write.via(Contextful)
for the respective destinations. Note that currently sharding can not be
destination-dependent: every window/pane for every destination will use the same number of shards
specified via FileIO.Write.withNumShards(int)
or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
.
Normally, when writing a collection of a custom type using a FileIO.Sink
that takes a
different type (for example, writing a PCollection<Event>
to a text-based Sink<String>
), one can simply apply a ParDo
or MapElements
to convert the custom
type to the sink's output type.
However, when using dynamic destinations, in many such cases the destination needs to be
extracted from the original type, so such a conversion is not possible. For example, one might
write events of a custom class Event
to a text sink, using the event's "type" as a
destination. In that case, specify an output function in FileIO.Write.via(Contextful,
Contextful)
or Write#via(Contextful, Sink)
.
class CSVSink implements FileIO.Sink<List<String>> {
private String header;
private PrintWriter writer;
public CSVSink(List<String> colNames) {
this.header = Joiner.on(",").join(colNames);
}
public void open(WritableByteChannel channel) throws IOException {
writer = new PrintWriter(Channels.newOutputStream(channel));
writer.println(header);
}
public void write(List<String> element) throws IOException {
writer.println(Joiner.on(",").join(element));
}
public void flush() throws IOException {
writer.flush();
}
}
PCollection<BankTransaction> transactions = ...;
// Convert transactions to strings before writing them to the CSV sink.
transactions.apply(MapElements
.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via(tx -> Arrays.asList(tx.getUser(), tx.getAmount())))
.apply(FileIO.<List<String>>write()
.via(new CSVSink(Arrays.asList("user", "amount")))
.to(".../path/to/")
.withPrefix("transactions")
.withSuffix(".csv"));
enum TransactionType {
DEPOSIT,
WITHDRAWAL,
TRANSFER,
...
List<String> getFieldNames();
List<String> getAllFields(BankTransaction tx);
}
PCollection<BankTransaction> transactions = ...;
transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
.by(Transaction::getTypeName)
.via(tx -> tx.getTypeName().toFields(tx), // Convert the data to be written to CSVSink
type -> new CSVSink(type.getFieldNames()))
.to(".../path/to/")
.withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
Modifier and Type | Class and Description |
---|---|
static class |
FileIO.Match
Implementation of
match() . |
static class |
FileIO.MatchAll
Implementation of
matchAll() . |
static class |
FileIO.MatchConfiguration
Describes configuration for matching filepatterns, such as
EmptyMatchTreatment and
continuous watching for matching files. |
static class |
FileIO.ReadableFile
A utility class for accessing a potentially compressed file.
|
static class |
FileIO.ReadMatches
Implementation of
readMatches() . |
static interface |
FileIO.Sink<ElementT>
Specifies how to write elements to individual files in
write() and writeDynamic() . |
static class |
FileIO.Write<DestinationT,UserT>
Implementation of
write() and writeDynamic() . |
Constructor and Description |
---|
FileIO() |
Modifier and Type | Method and Description |
---|---|
static FileIO.Match |
match()
Matches a filepattern using
FileSystems.match(java.util.List<java.lang.String>) and produces a collection of matched
resources (both files and directories) as MatchResult.Metadata . |
static FileIO.MatchAll |
matchAll()
Like
match() , but matches each filepattern in a collection of filepatterns. |
static FileIO.ReadMatches |
readMatches()
Converts each result of
match() or matchAll() to a FileIO.ReadableFile which can
be used to read the contents of each file, optionally decompressing it. |
static <InputT> FileIO.Write<java.lang.Void,InputT> |
write()
Writes elements to files using a
FileIO.Sink . |
static <DestT,InputT> |
writeDynamic()
Writes elements to files using a
FileIO.Sink and grouping the elements using "dynamic
destinations". |
public static FileIO.Match match()
FileSystems.match(java.util.List<java.lang.String>)
and produces a collection of matched
resources (both files and directories) as MatchResult.Metadata
.
By default, matches the filepattern once and produces a bounded PCollection
. To
continuously watch the filepattern for new matches, use MatchAll#continuously(Duration,
TerminationCondition)
- this will produce an unbounded PCollection
.
By default, a filepattern matching no resources is treated according to EmptyMatchTreatment.DISALLOW
. To configure this behavior, use FileIO.Match.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
.
Returned MatchResult.Metadata
are deduplicated by filename. For example, if this
transform observes a file with the same name several times with different metadata (e.g.
because the file is growing), it will emit the metadata the first time this file is observed,
and will ignore future changes to this file.
public static FileIO.MatchAll matchAll()
match()
, but matches each filepattern in a collection of filepatterns.
Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple filepatterns, it will be produced multiple times.
By default, a filepattern matching no resources is treated according to EmptyMatchTreatment.ALLOW_IF_WILDCARD
. To configure this behavior, use FileIO.MatchAll.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
.
public static FileIO.ReadMatches readMatches()
match()
or matchAll()
to a FileIO.ReadableFile
which can
be used to read the contents of each file, optionally decompressing it.public static <InputT> FileIO.Write<java.lang.Void,InputT> write()
FileIO.Sink
. See class-level documentation.public static <DestT,InputT> FileIO.Write<DestT,InputT> writeDynamic()
FileIO.Sink
and grouping the elements using "dynamic
destinations". See class-level documentation.