Class CsvIO

java.lang.Object
org.apache.beam.sdk.io.csv.CsvIO

public class CsvIO extends Object
PTransforms for reading and writing CSV files.

Reading CSV files

Reading from CSV files is not yet implemented. Please see https://github.com/apache/beam/issues/24552.

Valid CSVFormat Configuration

A CSVFormat must meet the following conditions to be considered valid when reading CSV:

Ignored CSVFormat parameters

The following CSVFormat parameters are either not relevant for parsing CSV or are validated satisfactorily by the Apache Commons CSV library.

Writing CSV files

To write a PCollection to one or more CSV files, use CsvIO.Write, using writeRows(java.lang.String, org.apache.commons.csv.CSVFormat) or write(java.lang.String, org.apache.commons.csv.CSVFormat). CsvIO.Write supports writing Row or custom Java types using an inferred Schema. Examples below show both scenarios. See the Beam Programming Guide on inferring schemas for more information on how to enable Beam to infer a Schema from a custom Java type.

CsvIO.Write only supports writing the parts of Schema aware types that do not contain any nested Schema.FieldTypes such a Schema.TypeName.ROW or repeated Schema.TypeName.ARRAY types. See VALID_FIELD_TYPE_SET for valid Schema.FieldTypes.

Example usage:

Suppose we have the following Transaction class annotated with @DefaultSchema(JavaBeanSchema.class) so that Beam can infer its Schema:

@DefaultSchema(JavaBeanSchema.class)
 public class Transaction {
   public Transaction() { … }
   public Long getTransactionId();
   public void setTransactionId(Long transactionId) { … }
   public String getBank() { … }
   public void setBank(String bank) { … }
   public double getPurchaseAmount() { … }
   public void setPurchaseAmount(double purchaseAmount) { … }
 }
 

From a PCollection<Transaction>, CsvIO.Write can write one or many CSV files automatically creating the header based on its inferred Schema.


 PCollection<Transaction> transactions = ...
 transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
 

The resulting CSV files will look like the following where the header is repeated for every file, whereas by default, CsvIO.Write will write all fields in sorted order of the field names.


 bank,purchaseAmount,transactionId
 A,10.23,12345
 B,54.65,54321
 C,11.76,98765
 

To control the order and subset of fields that CsvIO.Write writes, use CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>). Note, however, the following constraints:

  1. Each header column must match a field name in the Schema; matching is case sensitive.
  2. Matching header columns must match Schema fields that are valid Schema.FieldTypes; see VALID_FIELD_TYPE_SET.
  3. CSVFormat only allows repeated header columns when CSVFormat.withAllowDuplicateHeaderNames()

The following example shows the use of CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>) to control the order and subset of Transaction fields.


 PCollection<Transaction> transactions ...
 transactions.apply(
  CsvIO
    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
 );
 

The resulting CSV files will look like the following where the header is repeated for every file, but will only include the subset of fields in their listed order.


 transactionId,purchaseAmount
 12345,10.23
 54321,54.65
 98765,11.76
 

In addition to header customization, CsvIO.Write supports CSVFormat.withHeaderComments(java.lang.Object...) as shown below. Note that CSVFormat.withCommentMarker(char) is required when specifying header comments.


 PCollection<Transaction> transactions = ...
 transactions
    .apply(
        CsvIO.<Transaction>write("path/to/folder/prefix",
        CSVFormat.DEFAULT
          .withCommentMarker('#')
          .withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
    );
 

The resulting CSV files will look like the following where the header and header comments are repeated for every shard file.


 # Bank Report
 # 1970-01-01
 # Operator: John Doe
 bank,purchaseAmount,transactionId
 A,10.23,12345
 B,54.65,54321
 C,11.76,98765
 

A PCollection of Rows works just like custom Java types illustrated above, except we use writeRows(java.lang.String, org.apache.commons.csv.CSVFormat) as shown below for the same Transaction class. We derive Transaction's Schema using a DefaultSchema.DefaultSchemaProvider. Note that hard-coding the Rows below is for illustration purposes. Developers are instead encouraged to take advantage of DefaultSchema.DefaultSchemaProvider.toRowFunction(org.apache.beam.sdk.values.TypeDescriptor<T>).


 DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
 Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
 PCollection<Row> transactions = pipeline.apply(Create.of(
  Row
    .withSchema(schema)
    .withFieldValue("bank", "A")
    .withFieldValue("purchaseAmount", 10.23)
    .withFieldValue("transactionId", "12345")
    .build(),
  Row
    .withSchema(schema)
    .withFieldValue("bank", "B")
    .withFieldValue("purchaseAmount", 54.65)
    .withFieldValue("transactionId", "54321")
    .build(),
  Row
    .withSchema(schema)
    .withFieldValue("bank", "C")
    .withFieldValue("purchaseAmount", 11.76)
    .withFieldValue("transactionId", "98765")
    .build()
 );

 transactions.apply(
  CsvIO
    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
 );
 

Writing the transactions PCollection of Rows would yield the following CSV file content.


 bank,purchaseAmount,transactionId
 A,10.23,12345
 B,54.65,54321
 C,11.76,98765
 
CsvIO.Write does not support the following CSVFormat properties and will throw an IllegalArgumentException.
  • Field Details

  • Constructor Details

    • CsvIO

      public CsvIO()
  • Method Details

    • write

      public static <T> CsvIO.Write<T> write(String to, CSVFormat csvFormat)
      Instantiates a CsvIO.Write for writing user types in CSVFormat format.
    • writeRows

      public static CsvIO.Write<Row> writeRows(String to, CSVFormat csvFormat)
      Instantiates a CsvIO.Write for writing Rows in CSVFormat format.
    • parse

      public static <T> CsvIOParse<T> parse(Class<T> klass, CSVFormat csvFormat)
      Instantiates a CsvIOParse for parsing CSV string records into custom Schema-mapped Class<T>es from the records' assumed CsvFormat. See the Beam Programming Guide on how to configure your custom Class<T> for Beam to infer its Schema using a SchemaProvider annotation such as AutoValueSchema or JavaBeanSchema.

      Example usage

      The example below illustrates parsing CsvFormat#DEFAULT formatted CSV string records, read from TextIO.Read, into an AutoValueSchema annotated AutoValue data class PCollection.
      
       // SomeDataClass is a data class configured for Beam to automatically infer its Schema.
       @DefaultSchema(AutoValueSchema.class)
       @AutoValue
       abstract class SomeDataClass {
      
          abstract String getSomeString();
          abstract Integer getSomeInteger();
      
          @AutoValue.Builder
          abstract static class Builder {
            abstract Builder setSomeString(String value);
            abstract Builder setSomeInteger(Integer value);
      
            abstract SomeDataClass build();
          }
       }
      
       // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery.
       Pipeline pipeline = Pipeline.create();
      
       // Read CSV records from Google Cloud storage using TextIO.
       PCollection<String> csvRecords = pipeline
        .apply(TextIO.read().from("gs://bucket/folder/*.csv");
      
       // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parse.
       CsvIOParseResult<SomeDataClass> result = csvRecords.apply(CsvIO.parse(
            SomeDataClass.class,
            CsvFormat.DEFAULT.withHeader("someString", "someInteger")
       ));
      
       // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery.
       result.getErrors().apply(BigQueryIO.<CsvIOParseError>write()
        .to("project:dataset.table_of_errors")
        .useBeamSchema()
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
      
       // Acquire the successful PCollection<SomeDataClass> output.
       PCollection<SomeDataClass> output = result.getOutput();
      
       // Do something with the output such as write to BigQuery.
       output.apply(BigQueryIO.<SomeDataClass>write()
        .to("project:dataset.table_of_output")
        .useBeamSchema()
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
       
    • parseRows

      public static CsvIOParse<Row> parseRows(Schema schema, CSVFormat csvFormat)
      Instantiates a CsvIOParse for parsing CSV string records into Rows from the records' assumed CsvFormat and expected Schema.

      Example usage

      The example below illustrates parsing CsvFormat#DEFAULT formatted CSV string records, read from TextIO.Read, into a Row PCollection.
      
       // Define the expected Schema.
       Schema schema = Schema.of(
        Schema.Field.of("someString", FieldType.STRING),
        Schema.Field.of("someInteger", FieldType.INT32)
       );
      
       // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery.
       Pipeline pipeline = Pipeline.create();
      
       // Read CSV records from Google Cloud storage using TextIO.
       PCollection<String> csvRecords = pipeline
        .apply(TextIO.read().from("gs://bucket/folder/*.csv");
      
       // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parseRows.
       CsvIOParseResult<Row> result = csvRecords.apply(CsvIO.parseRow(
            schema,
            CsvFormat.DEFAULT.withHeader("someString", "someInteger")
       ));
      
       // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery.
       result.getErrors().apply(BigQueryIO.<CsvIOParseError>write()
        .to("project:dataset.table_of_errors")
        .useBeamSchema()
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));
      
       // Acquire the successful PCollection<Row> output.
       PCollection<Row> output = result.getOutput();
      
       // Do something with the output such as write to BigQuery.
       output.apply(BigQueryIO.<Row>write()
        .to("project:dataset.table_of_output")
        .useBeamSchema()
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));