Class ParquetIO

java.lang.Object
org.apache.beam.sdk.io.parquet.ParquetIO

public class ParquetIO extends Object
IO to read and write Parquet files.

Reading Parquet files

ParquetIO source returns a PCollection for Parquet files. The elements in the PCollection are Avro GenericRecord.

To configure the ParquetIO.Read, you have to provide the file patterns (from) of the Parquet files and the schema.

For example:


 PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar"));
 ...
 

As ParquetIO.Read is based on FileIO, it supports any filesystem (hdfs, ...).

When using schemas created via reflection, it may be useful to generate GenericRecord instances rather than instances of the class associated with the schema. ParquetIO.Read and ParquetIO.ReadFiles provide ParquetIO.Read.withAvroDataModel(GenericData) allowing implementations to set the data model associated with the AvroParquetReader

For more advanced use cases, like reading each file in a PCollection of FileIO.ReadableFile, use the ParquetIO.ReadFiles transform.

For example:


 PCollection<FileIO.ReadableFile> files = pipeline
   .apply(FileIO.match().filepattern(options.getInputFilepattern())
   .apply(FileIO.readMatches());

 PCollection<GenericRecord> output = files.apply(ParquetIO.readFiles(SCHEMA));
 

ParquetIO leverages splittable reading by using Splittable DoFn. It initially splits the files into the blocks of 64MB and may dynamically split further for higher read efficiency.

Reading with projection can be enabled with the projection schema as following. Splittable reading is enabled when reading with projection. The projection_schema contains only the column that we would like to read and encoder_schema contains the schema to encode the output with the unwanted columns changed to nullable. Partial reading provide decrease of reading time due to partial processing of the data and partial encoding. The decrease in the reading time depends on the relative position of the columns. Memory allocation is optimised depending on the encoding schema. Note that the improvement is not as significant comparing to the proportion of the data requested, since the processing time saved is only the time to read the unwanted columns, the reader will still go over the data set according to the encoding schema since data for each column in a row is stored interleaved.


 PCollection<GenericRecord> records =
   pipeline
     .apply(
       ParquetIO.read(SCHEMA).from("/foo/bar").withProjection(Projection_schema,Encoder_Schema));
 

Reading records of an unknown schema

To read records from files whose schema is unknown at pipeline construction time or differs between files, use parseGenericRecords(SerializableFunction) - in this case, you will need to specify a parsing function for converting each GenericRecord into a value of your custom type.

For example:


 Pipeline p = ...;

 PCollection<Foo> records =
     p.apply(
       ParquetIO.parseGenericRecords(
           new SerializableFunction<GenericRecord, Foo>() {
               public Foo apply(GenericRecord record) {
                   // If needed, access the schema of the record using record.getSchema()
                   return ...;
               }
           })
           .from(...));

 // For reading from files
  PCollection<FileIO.ReadableFile> files = p.apply(...);

  PCollection<Foo> records =
     files
       .apply(
           ParquetIO.parseFilesGenericRecords(
               new SerializableFunction<GenericRecord, Foo>() {
                   public Foo apply(GenericRecord record) {
                       // If needed, access the schema of the record using record.getSchema()
                       return ...;
                   }
           }));
 

Inferring Beam schemas from Parquet files

If you want to use SQL or schema based operations on an Parquet-based PCollection, you must configure the read transform to infer the Beam schema and automatically setup the Beam related coders by doing:


 PCollection<GenericRecord> parquetRecords =
   p.apply(ParquetIO.read(...).from(...).withBeamSchemas(true));
 
You can also use it when reading a list of filenams from a PCollection<String>:

 PCollection<String> filePatterns = p.apply(...);

 PCollection<GenericRecord> parquetRecords =
   filePatterns
     .apply(ParquetIO.readFiles(...).withBeamSchemas(true));
 

Writing Parquet files

ParquetIO.Sink allows you to write a PCollection of GenericRecord into a Parquet file. It can be used with the general-purpose FileIO transforms with FileIO.write/writeDynamic specifically.

By default, ParquetIO.Sink produces output files that are compressed using the CompressionCodec.SNAPPY. This default can be changed or overridden using ParquetIO.Sink.withCompressionCodec(CompressionCodecName).

For example:


 pipeline
   .apply(...) // PCollection<GenericRecord>
   .apply(FileIO
     .<GenericRecord>write()
     .via(ParquetIO.sink(SCHEMA)
       .withCompressionCodec(CompressionCodecName.SNAPPY))
     .to("destination/path")
     .withSuffix(".parquet"));
 
See Also: