@Experimental(value=SOURCE_SINK) public class ParquetIO extends java.lang.Object
ParquetIO
source returns a PCollection
for Parquet files. The elements in the
PCollection
are Avro GenericRecord
.
To configure the ParquetIO.Read
, you have to provide the file patterns (from) of the Parquet
files and the schema.
For example:
PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar"));
...
As ParquetIO.Read
is based on FileIO
, it supports any filesystem (hdfs, ...).
When using schemas created via reflection, it may be useful to generate GenericRecord
instances rather than instances of the class associated with the schema. ParquetIO.Read
and ParquetIO.ReadFiles
provide ParquetIO.Read.withAvroDataModel(GenericData)
allowing implementations
to set the data model associated with the AvroParquetReader
For more advanced use cases, like reading each file in a PCollection
of FileIO.ReadableFile
, use the ParquetIO.ReadFiles
transform.
For example:
PCollection<FileIO.ReadableFile> files = pipeline
.apply(FileIO.match().filepattern(options.getInputFilepattern())
.apply(FileIO.readMatches());
PCollection<GenericRecord> output = files.apply(ParquetIO.readFiles(SCHEMA));
Splittable reading can be enabled by allowing the use of Splittable DoFn. It initially split
the files into blocks of 64MB and may dynamically split further for higher read efficiency. It
can be enabled by using ParquetIO.Read.withSplit()
.
For example:
PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar").withSplit());
...
Reading with projection can be enabled with the projection schema as following. Splittable reading is enabled when reading with projection. The projection_schema contains only the column that we would like to read and encoder_schema contains the schema to encode the output with the unwanted columns changed to nullable. Partial reading provide decrease of reading time due to partial processing of the data and partial encoding. The decrease in the reading time depends on the relative position of the columns. Memory allocation is optimised depending on the encoding schema. Note that the improvement is not as significant comparing to the proportion of the data requested, since the processing time saved is only the time to read the unwanted columns, the reader will still go over the data set according to the encoding schema since data for each column in a row is stored interleaved.
PCollection<GenericRecord> records =
pipeline
.apply(
ParquetIO.read(SCHEMA).from("/foo/bar").withProjection(Projection_schema,Encoder_Schema));
To read records from files whose schema is unknown at pipeline construction time or differs
between files, use parseGenericRecords(SerializableFunction)
- in this case, you will
need to specify a parsing function for converting each GenericRecord
into a value of your
custom type.
For example:
Pipeline p = ...;
PCollection<Foo> records =
p.apply(
ParquetIO.parseGenericRecords(
new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
// If needed, access the schema of the record using record.getSchema()
return ...;
}
})
.from(...));
// For reading from files
PCollection<FileIO.ReadableFile> files = p.apply(...);
PCollection<Foo> records =
files
.apply(
ParquetIO.parseFilesGenericRecords(
new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
// If needed, access the schema of the record using record.getSchema()
return ...;
}
}));
If you want to use SQL or schema based operations on an Parquet-based PCollection, you must configure the read transform to infer the Beam schema and automatically setup the Beam related coders by doing:
PCollection<GenericRecord> parquetRecords =
p.apply(ParquetIO.read(...).from(...).withBeamSchemas(true));
You can also use it when reading a list of filenams from a PCollection<String>
:
PCollection<String> filePatterns = p.apply(...);
PCollection<GenericRecord> parquetRecords =
filePatterns
.apply(ParquetIO.readFiles(...).withBeamSchemas(true));
ParquetIO.Sink
allows you to write a PCollection
of GenericRecord
into
a Parquet file. It can be used with the general-purpose FileIO
transforms with
FileIO.write/writeDynamic specifically.
By default, ParquetIO.Sink
produces output files that are compressed using the CompressionCodec.SNAPPY
. This default can be changed or overridden
using ParquetIO.Sink.withCompressionCodec(CompressionCodecName)
.
For example:
pipeline
.apply(...) // PCollection<GenericRecord>
.apply(FileIO
.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA)
.withCompressionCodec(CompressionCodecName.SNAPPY))
.to("destination/path")
.withSuffix(".parquet"));
This IO API is considered experimental and may break or receive backwards-incompatible changes in future versions of the Apache Beam SDK.
Modifier and Type | Class and Description |
---|---|
static class |
ParquetIO.Parse<T>
Implementation of
parseGenericRecords(SerializableFunction) . |
static class |
ParquetIO.ParseFiles<T>
Implementation of
parseFilesGenericRecords(SerializableFunction) . |
static class |
ParquetIO.Read
Implementation of
read(Schema) . |
static class |
ParquetIO.ReadFiles
Implementation of
readFiles(Schema) . |
static class |
ParquetIO.Sink
Implementation of
sink(org.apache.avro.Schema) . |
Modifier and Type | Method and Description |
---|---|
static <T> ParquetIO.ParseFiles<T> |
parseFilesGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Reads
GenericRecord from Parquet files and converts to user defined type using provided
parseFn . |
static <T> ParquetIO.Parse<T> |
parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Reads
GenericRecord from a Parquet file (or multiple Parquet files matching the
pattern) and converts to user defined type using provided parseFn. |
static ParquetIO.Read |
read(Schema schema)
Reads
GenericRecord from a Parquet file (or multiple Parquet files matching the
pattern). |
static ParquetIO.ReadFiles |
readFiles(Schema schema)
Like
read(Schema) , but reads each file in a PCollection of FileIO.ReadableFile , which allows more flexible usage. |
static ParquetIO.Sink |
sink(Schema schema)
Creates a
ParquetIO.Sink that, for use with FileIO.write() . |
public static ParquetIO.Read read(Schema schema)
GenericRecord
from a Parquet file (or multiple Parquet files matching the
pattern).public static ParquetIO.ReadFiles readFiles(Schema schema)
read(Schema)
, but reads each file in a PCollection
of FileIO.ReadableFile
, which allows more flexible usage.public static <T> ParquetIO.Parse<T> parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
GenericRecord
from a Parquet file (or multiple Parquet files matching the
pattern) and converts to user defined type using provided parseFn.public static <T> ParquetIO.ParseFiles<T> parseFilesGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
GenericRecord
from Parquet files and converts to user defined type using provided
parseFn
.public static ParquetIO.Sink sink(Schema schema)
ParquetIO.Sink
that, for use with FileIO.write()
.