public class TextIO
extends java.lang.Object
PTransform
s for reading and writing text files.
To read a PCollection
from one or more text files, use TextIO.read()
to
instantiate a transform and use TextIO.Read.from(String)
to specify the path of the
file(s) to be read. Alternatively, if the filenames to be read are themselves in a PCollection
, apply readAll()
or readFiles()
.
read()
returns a PCollection
of Strings
, each corresponding to
one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n',
or specified delimiter see TextIO.Read.withDelimiter(byte[])
).
By default, the filepatterns are expanded only once. TextIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>)
and TextIO.ReadAll.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>)
allow streaming of new files matching the filepattern(s).
By default, read()
prohibits filepatterns that match no files, and readAll()
allows them in case the filepattern contains a glob wildcard character. Use TextIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
and TextIO.ReadAll.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
to
configure this behavior.
Example 1: reading a file or filepattern.
Pipeline p = ...;
// A simple Read of a local file (only runs locally):
PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
Example 2: reading a PCollection of filenames.
Pipeline p = ...;
// E.g. the filenames might be computed from other data in the pipeline, or
// read from a data source.
PCollection<String> filenames = ...;
// Read all files in the collection.
PCollection<String> lines = filenames.apply(TextIO.readAll());
Example 3: streaming new files matching a filepattern.
Pipeline p = ...;
PCollection<String> lines = p.apply(TextIO.read()
.from("/local/path/to/files/*")
.watchForNewFiles(
// Check for new files every minute
Duration.standardMinutes(1),
// Stop watching the filepattern if no new files appear within an hour
afterTimeSinceNewOutput(Duration.standardHours(1))));
If it is known that the filepattern will match a very large number of files (e.g. tens of
thousands or more), use TextIO.Read.withHintMatchesManyFiles()
for better performance and
scalability. Note that it may decrease performance if the filepattern matches only a small number
of files.
To write a PCollection
to one or more text files, use TextIO.write()
, using
TextIO.Write.to(String)
to specify the output prefix of the files to write.
For example:
// A simple Write to a local file (only runs locally):
PCollection<String> lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt"));
// Same as above, only with Gzip compression:
PCollection<String> lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt"))
.withSuffix(".txt")
.withCompression(Compression.GZIP));
Any existing files with the same names as generated output files will be overwritten.
If you want better control over how filenames are generated than the default policy allows, a
custom FileBasedSink.FilenamePolicy
can also be set using TextIO.Write#to(FilenamePolicy)
.
By default, all input is put into the global window before writing. If per-window writes are
desired - for example, when using a streaming runner - TextIO.Write.withWindowedWrites()
will cause windowing and triggering to be preserved. When producing windowed writes with a
streaming runner that supports triggers, the number of output shards must be set explicitly using
TextIO.Write.withNumShards(int)
; some runners may set this for you to a runner-chosen
value, so you may need not set it yourself. If setting an explicit template using TextIO.Write.withShardNameTemplate(String)
, make sure that the template contains placeholders
for the window and the pane; W is expanded into the window text, and P into the pane; the default
template will include both the window and the pane in the filename.
TextIO also supports dynamic, value-dependent file destinations. The most general form of this
is done via TextIO.Write#to(DynamicDestinations)
. A FileBasedSink.DynamicDestinations
class
allows you to convert any input value into a custom destination object, and map that destination
object to a FileBasedSink.FilenamePolicy
. This allows using different filename policies (or more
commonly, differently-configured instances of the same policy) based on the input record. Often
this is used in conjunction with writeCustomType()
, which allows your FileBasedSink.DynamicDestinations
object to examine the input type and takes a format function to convert that
type to a string for writing.
A convenience shortcut is provided for the case where the default naming policy is used, but
different configurations of this policy are wanted based on the input record. Default naming
policies can be configured using the DefaultFilenamePolicy.Params
object.
PCollection<UserEvent>> lines = ...;
lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent())
.to(new SerializableFunction<UserEvent, Params>() {
public String apply(UserEvent value) {
return new Params().withBaseFilename(baseDirectory + "/" + value.country());
}
}),
new Params().withBaseFilename(baseDirectory + "/empty");
Modifier and Type | Class and Description |
---|---|
static class |
TextIO.CompressionType
Deprecated.
Use
Compression . |
static class |
TextIO.Read
Implementation of
read() . |
static class |
TextIO.ReadAll
Implementation of
readAll() . |
static class |
TextIO.ReadFiles
Implementation of
readFiles() . |
static class |
TextIO.TypedWrite<UserT,DestinationT>
Implementation of
write() . |
static class |
TextIO.Write
This class is used as the default return value of
write() . |
Modifier and Type | Method and Description |
---|---|
static TextIO.Read |
read()
A
PTransform that reads from one or more text files and returns a bounded
PCollection containing one element for each line of the input files. |
static TextIO.ReadAll |
readAll()
|
static TextIO.ReadFiles |
readFiles()
Like
read() , but reads each file in a PCollection of FileIO.ReadableFile , returned by FileIO.readMatches() . |
static TextIO.Write |
write()
A
PTransform that writes a PCollection to a text file (or multiple text files
matching a sharding pattern), with each element of the input collection encoded into its own
line. |
static <UserT> TextIO.TypedWrite<UserT,java.lang.Void> |
writeCustomType()
A
PTransform that writes a PCollection to a text file (or multiple text files
matching a sharding pattern), with each element of the input collection encoded into its own
line. |
public static TextIO.Read read()
PTransform
that reads from one or more text files and returns a bounded
PCollection
containing one element for each line of the input files.public static TextIO.ReadAll readAll()
PTransform
that works like read()
, but reads each file in a PCollection
of filepatterns.
Can be applied to both bounded and unbounded PCollections
, so this is
suitable for reading a PCollection
of filepatterns arriving as a stream. However, every
filepattern is expanded once at the moment it is processed, rather than watched for new files
matching the filepattern to appear. Likewise, every file is read once, rather than watched for
new entries.
public static TextIO.ReadFiles readFiles()
read()
, but reads each file in a PCollection
of FileIO.ReadableFile
, returned by FileIO.readMatches()
.public static TextIO.Write write()
PTransform
that writes a PCollection
to a text file (or multiple text files
matching a sharding pattern), with each element of the input collection encoded into its own
line.public static <UserT> TextIO.TypedWrite<UserT,java.lang.Void> writeCustomType()
PTransform
that writes a PCollection
to a text file (or multiple text files
matching a sharding pattern), with each element of the input collection encoded into its own
line.
This version allows you to apply TextIO
writes to a PCollection of a custom type
UserT
. A format mechanism that converts the input type UserT
to the String that
will be written to the file must be specified. If using a custom FileBasedSink.DynamicDestinations
object this is done using FileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise the TextIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, java.lang.String>)
can be used to specify a format function.
The advantage of using a custom type is that is it allows a user-provided FileBasedSink.DynamicDestinations
object, set via Write#to(DynamicDestinations)
to examine the
custom type when choosing a destination.