Class TextIO
PTransform
s for reading and writing text files.
Reading text files
To read a PCollection
from one or more text files, use TextIO.read()
to
instantiate a transform and use TextIO.Read.from(String)
to specify the path of the
file(s) to be read. Alternatively, if the filenames to be read are themselves in a PCollection
you can use FileIO
to match them and readFiles()
to read them.
read()
returns a PCollection
of Strings
, each corresponding to
one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or
specified delimiter see TextIO.Read.withDelimiter(byte[])
).
Filepattern expansion and watching
By default, the filepatterns are expanded only once. TextIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>, boolean)
or the
combination of FileIO.Match.continuously(Duration, TerminationCondition)
and readFiles()
allow streaming of new files matching the filepattern(s).
By default, read()
prohibits filepatterns that match no files, and readFiles()
allows them in case the filepattern contains a glob wildcard character. Use TextIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
or FileIO.Match.withEmptyMatchTreatment(EmptyMatchTreatment)
plus readFiles()
to configure
this behavior.
Example 1: reading a file or filepattern.
Pipeline p = ...;
// A simple Read of a local file (only runs locally):
PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
Example 2: reading a PCollection of filenames.
Pipeline p = ...;
// E.g. the filenames might be computed from other data in the pipeline, or
// read from a data source.
PCollection<String> filenames = ...;
// Read all files in the collection.
PCollection<String> lines =
filenames
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
Example 3: streaming new files matching a filepattern.
Pipeline p = ...;
PCollection<String> lines = p.apply(TextIO.read()
.from("/local/path/to/files/*")
.watchForNewFiles(
// Check for new files every minute
Duration.standardMinutes(1),
// Stop watching the filepattern if no new files appear within an hour
afterTimeSinceNewOutput(Duration.standardHours(1))));
Reading a very large number of files
If it is known that the filepattern will match a very large number of files (e.g. tens of
thousands or more), use TextIO.Read.withHintMatchesManyFiles()
for better performance and
scalability. Note that it may decrease performance if the filepattern matches only a small number
of files.
Writing text files
To write a PCollection
to one or more text files, use TextIO.write()
, using
TextIO.Write.to(String)
to specify the output prefix of the files to write.
For example:
// A simple Write to a local file (only runs locally):
PCollection<String> lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt"));
// Same as above, only with Gzip compression:
PCollection<String> lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt"))
.withSuffix(".txt")
.withCompression(Compression.GZIP));
Any existing files with the same names as generated output files will be overwritten.
If you want better control over how filenames are generated than the default policy allows, a
custom FileBasedSink.FilenamePolicy
can also be set using TextIO.Write.to(FilenamePolicy)
.
Advanced features
TextIO
supports all features of FileIO.write()
and FileIO.writeDynamic()
,
such as writing windowed/unbounded data, writing data to multiple destinations, and so on, by
providing a TextIO.Sink
via sink()
.
For example, to write events of different type to different filenames:
PCollection<Event> events = ...;
events.apply(FileIO.<EventType, Event>writeDynamic()
.by(Event::getTypeName)
.via(TextIO.sink(), Event::toString)
.to(type -> nameFilesUsingWindowPaneAndShard(".../events/" + type + "/data", ".txt")));
For backwards compatibility, TextIO
also supports the legacy FileBasedSink.DynamicDestinations
interface for advanced features via TextIO.Write.to(DynamicDestinations)
.
Error handling for records that are malformed can be handled by using TextIO.TypedWrite.withBadRecordErrorHandler(ErrorHandler)
. See documentation in FileIO
for
details on usage
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enum
Deprecated.static class
Implementation ofread()
.static class
Deprecated.SeereadAll()
for details.static class
Implementation ofreadFiles()
.static class
Implementation ofsink()
.static class
Implementation ofwrite()
.static class
This class is used as the default return value ofwrite()
. -
Method Summary
Modifier and TypeMethodDescriptionstatic TextIO.Read
read()
APTransform
that reads from one or more text files and returns a boundedPCollection
containing one element for each line of the input files.static TextIO.ReadAll
readAll()
Deprecated.static TextIO.ReadFiles
Likeread()
, but reads each file in aPCollection
ofFileIO.ReadableFile
, returned byFileIO.readMatches()
.static TextIO.Sink
sink()
Creates aTextIO.Sink
that writes newline-delimited strings in UTF-8, for use withFileIO.write()
.static TextIO.Write
write()
APTransform
that writes aPCollection
to a text file (or multiple text files matching a sharding pattern), with each element of the input collection encoded into its own line.static <UserT> TextIO.TypedWrite
<UserT, Void> APTransform
that writes aPCollection
to a text file (or multiple text files matching a sharding pattern), with each element of the input collection encoded into its own line.
-
Method Details
-
read
APTransform
that reads from one or more text files and returns a boundedPCollection
containing one element for each line of the input files. -
readAll
Deprecated.You can achieve The functionality ofreadAll()
usingFileIO
matching plusreadFiles()
. This is the preferred method to make composition explicit.TextIO.ReadAll
will not receive upgrades and will be removed in a future version of Beam.APTransform
that works likeread()
, but reads each file in aPCollection
of filepatterns.Can be applied to both bounded and unbounded
PCollections
, so this is suitable for reading aPCollection
of filepatterns arriving as a stream. However, every filepattern is expanded once at the moment it is processed, rather than watched for new files matching the filepattern to appear. Likewise, every file is read once, rather than watched for new entries. -
readFiles
Likeread()
, but reads each file in aPCollection
ofFileIO.ReadableFile
, returned byFileIO.readMatches()
. -
write
APTransform
that writes aPCollection
to a text file (or multiple text files matching a sharding pattern), with each element of the input collection encoded into its own line. -
writeCustomType
APTransform
that writes aPCollection
to a text file (or multiple text files matching a sharding pattern), with each element of the input collection encoded into its own line.This version allows you to apply
TextIO
writes to a PCollection of a custom typeTextIO
. A format mechanism that converts the input typeTextIO
to the String that will be written to the file must be specified. If using a customFileBasedSink.DynamicDestinations
object this is done usingFileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise theTextIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, java.lang.String>)
can be used to specify a format function.The advantage of using a custom type is that is it allows a user-provided
FileBasedSink.DynamicDestinations
object, set viaTextIO.Write.to(DynamicDestinations)
to examine the custom type when choosing a destination. -
sink
Creates aTextIO.Sink
that writes newline-delimited strings in UTF-8, for use withFileIO.write()
.
-
Compression
.