Class CsvIO
PTransforms for reading and writing CSV files.
Reading CSV files
Reading from CSV files is not yet implemented. Please see https://github.com/apache/beam/issues/24552.
Valid CSVFormat Configuration
A
CSVFormat must meet the following conditions to be considered valid when reading CSV:
String[]header - must contain at least one column name, and all column names must be non-empty.booleanallowDuplicateHeaderNames - must be false.booleanallowMissingColumnNames - must be false.booleanignoreHeaderCase - must be false.booleanskipHeaderRecord - must be false. The header is already accounted for during parsing.
Ignored CSVFormat parameters
The following CSVFormat parameters are either not relevant for parsing CSV or are
validated satisfactorily by the Apache Commons CSV
library.
booleanautoFlushcharcommentMarkerchardelimitercharescapecharquoteorg.apache.commons.csv.QuoteModequoteModeStringnullStringcharrecordSeparator- systemRecordSeparator
- firstRecordAsHeader
java.lang.Object...headerCommentsbooleanignoreEmptyLinesbooleanignoreSurroundingSpacesbooleantrimbooleanskipHeaderRecordbooleantrailingDelimiter
Writing CSV files
To write a PCollection to one or more CSV files, use CsvIO.Write, using writeRows(java.lang.String, org.apache.commons.csv.CSVFormat) or write(java.lang.String, org.apache.commons.csv.CSVFormat). CsvIO.Write supports writing Row or
custom Java types using an inferred Schema. Examples below show both scenarios. See the
Beam Programming Guide on inferring
schemas for more information on how to enable Beam to infer a Schema from a custom
Java type.
CsvIO.Write only supports writing the parts of Schema aware types that do not
contain any nested Schema.FieldTypes such a Schema.TypeName.ROW or repeated Schema.TypeName.ARRAY types. See VALID_FIELD_TYPE_SET for valid Schema.FieldTypes.
Example usage:
Suppose we have the following Transaction class annotated with
@DefaultSchema(JavaBeanSchema.class) so that Beam can infer its Schema:
@DefaultSchema(JavaBeanSchema.class)
public class Transaction {
public Transaction() { … }
public Long getTransactionId();
public void setTransactionId(Long transactionId) { … }
public String getBank() { … }
public void setBank(String bank) { … }
public double getPurchaseAmount() { … }
public void setPurchaseAmount(double purchaseAmount) { … }
}
From a PCollection<Transaction>, CsvIO.Write can write one or many CSV files
automatically creating the header based on its inferred Schema.
PCollection<Transaction> transactions = ...
transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
The resulting CSV files will look like the following where the header is repeated for every
file, whereas by default, CsvIO.Write will write all fields in sorted order of the
field names.
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
To control the order and subset of fields that CsvIO.Write writes, use CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>). Note, however, the following constraints:
- Each header column must match a field name in the
Schema; matching is case sensitive. - Matching header columns must match
Schemafields that are validSchema.FieldTypes; seeVALID_FIELD_TYPE_SET. CSVFormatonly allows repeated header columns whenCSVFormat.withAllowDuplicateHeaderNames()
The following example shows the use of CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>) to control the order and
subset of Transaction fields.
PCollection<Transaction> transactions ...
transactions.apply(
CsvIO
.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
);
The resulting CSV files will look like the following where the header is repeated for every file, but will only include the subset of fields in their listed order.
transactionId,purchaseAmount
12345,10.23
54321,54.65
98765,11.76
In addition to header customization, CsvIO.Write supports CSVFormat.withHeaderComments(java.lang.Object...) as shown below. Note that CSVFormat.withCommentMarker(char) is
required when specifying header comments.
PCollection<Transaction> transactions = ...
transactions
.apply(
CsvIO.<Transaction>write("path/to/folder/prefix",
CSVFormat.DEFAULT
.withCommentMarker('#')
.withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
);
The resulting CSV files will look like the following where the header and header comments are repeated for every shard file.
# Bank Report
# 1970-01-01
# Operator: John Doe
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
A PCollection of Rows works just like custom Java types illustrated above,
except we use writeRows(java.lang.String, org.apache.commons.csv.CSVFormat) as shown below for the same Transaction class. We
derive Transaction's Schema using a DefaultSchema.DefaultSchemaProvider. Note that
hard-coding the Rows below is for illustration purposes. Developers are instead
encouraged to take advantage of DefaultSchema.DefaultSchemaProvider.toRowFunction(org.apache.beam.sdk.values.TypeDescriptor<T>).
DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
PCollection<Row> transactions = pipeline.apply(Create.of(
Row
.withSchema(schema)
.withFieldValue("bank", "A")
.withFieldValue("purchaseAmount", 10.23)
.withFieldValue("transactionId", "12345")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "B")
.withFieldValue("purchaseAmount", 54.65)
.withFieldValue("transactionId", "54321")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "C")
.withFieldValue("purchaseAmount", 11.76)
.withFieldValue("transactionId", "98765")
.build()
);
transactions.apply(
CsvIO
.writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
);
Writing the transactions PCollection of Rows would yield the following CSV
file content.
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
CsvIO.Write does not support the following CSVFormat properties and will throw an
IllegalArgumentException.
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Set<Schema.FieldType> The validSchema.FieldTypefrom whichCsvIOconverts CSV records to the fields. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> CsvIOParse<T> Instantiates aCsvIOParsefor parsing CSV string records into customSchema-mappedClass<T>es from the records' assumed CsvFormat.static CsvIOParse<Row> Instantiates aCsvIOParsefor parsing CSV string records intoRows from the records' assumed CsvFormat and expectedSchema.static <T> CsvIO.Write<T> Instantiates aCsvIO.Writefor writing user types inCSVFormatformat.static CsvIO.Write<Row>
-
Field Details
-
VALID_FIELD_TYPE_SET
The validSchema.FieldTypefrom whichCsvIOconverts CSV records to the fields.
-
-
Constructor Details
-
CsvIO
public CsvIO()
-
-
Method Details
-
write
Instantiates aCsvIO.Writefor writing user types inCSVFormatformat. -
writeRows
-
parse
Instantiates aCsvIOParsefor parsing CSV string records into customSchema-mappedClass<T>es from the records' assumed CsvFormat. See the Beam Programming Guide on how to configure your customClass<T>for Beam to infer itsSchemausing aSchemaProviderannotation such asAutoValueSchemaorJavaBeanSchema.Example usage
The example below illustrates parsing CsvFormat#DEFAULT formatted CSV string records, read fromTextIO.Read, into anAutoValueSchemaannotated AutoValue data classPCollection.// SomeDataClass is a data class configured for Beam to automatically infer its Schema. @DefaultSchema(AutoValueSchema.class) @AutoValue abstract class SomeDataClass { abstract String getSomeString(); abstract Integer getSomeInteger(); @AutoValue.Builder abstract static class Builder { abstract Builder setSomeString(String value); abstract Builder setSomeInteger(Integer value); abstract SomeDataClass build(); } } // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery. Pipeline pipeline = Pipeline.create(); // Read CSV records from Google Cloud storage using TextIO. PCollection<String> csvRecords = pipeline .apply(TextIO.read().from("gs://bucket/folder/*.csv"); // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parse. CsvIOParseResult<SomeDataClass> result = csvRecords.apply(CsvIO.parse( SomeDataClass.class, CsvFormat.DEFAULT.withHeader("someString", "someInteger") )); // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery. result.getErrors().apply(BigQueryIO.<CsvIOParseError>write() .to("project:dataset.table_of_errors") .useBeamSchema() .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); // Acquire the successful PCollection<SomeDataClass> output. PCollection<SomeDataClass> output = result.getOutput(); // Do something with the output such as write to BigQuery. output.apply(BigQueryIO.<SomeDataClass>write() .to("project:dataset.table_of_output") .useBeamSchema() .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); -
parseRows
Instantiates aCsvIOParsefor parsing CSV string records intoRows from the records' assumed CsvFormat and expectedSchema.Example usage
The example below illustrates parsing CsvFormat#DEFAULT formatted CSV string records, read fromTextIO.Read, into aRowPCollection.// Define the expected Schema. Schema schema = Schema.of( Schema.Field.of("someString", FieldType.STRING), Schema.Field.of("someInteger", FieldType.INT32) ); // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery. Pipeline pipeline = Pipeline.create(); // Read CSV records from Google Cloud storage using TextIO. PCollection<String> csvRecords = pipeline .apply(TextIO.read().from("gs://bucket/folder/*.csv"); // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parseRows. CsvIOParseResult<Row> result = csvRecords.apply(CsvIO.parseRow( schema, CsvFormat.DEFAULT.withHeader("someString", "someInteger") )); // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery. result.getErrors().apply(BigQueryIO.<CsvIOParseError>write() .to("project:dataset.table_of_errors") .useBeamSchema() .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); // Acquire the successful PCollection<Row> output. PCollection<Row> output = result.getOutput(); // Do something with the output such as write to BigQuery. output.apply(BigQueryIO.<Row>write() .to("project:dataset.table_of_output") .useBeamSchema() .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND));
-