public class CsvIO
extends java.lang.Object
PTransform
s for reading and writing CSV files.
Reading from CSV files is not yet implemented. Please see https://github.com/apache/beam/issues/24552.
A CSVFormat
must meet the following conditions to be considered valid when reading CSV:
String[]
header
- must contain at least one column name, and all column names must be non-empty.
boolean
allowDuplicateHeaderNames
- must be false.
boolean
allowMissingColumnNames
- must be false.
boolean
ignoreHeaderCase
- must be false.
boolean
skipHeaderRecord
- must be false. The header is already accounted for during parsing.
The following CSVFormat
parameters are either not relevant for parsing CSV or are
validated satisfactorily by the Apache Commons CSV
library.
boolean
autoFlush
char
commentMarker
char
delimiter
char
escape
char
quote
org.apache.commons.csv.QuoteMode
quoteMode
String
nullString
char
recordSeparator
java.lang.Object...
headerComments
boolean
ignoreEmptyLines
boolean
ignoreSurroundingSpaces
boolean
trim
boolean
skipHeaderRecord
boolean
trailingDelimiter
To write a PCollection
to one or more CSV files, use CsvIO.Write
, using writeRows(java.lang.String, org.apache.commons.csv.CSVFormat)
or write(java.lang.String, org.apache.commons.csv.CSVFormat)
. CsvIO.Write
supports writing Row
or
custom Java types using an inferred Schema
. Examples below show both scenarios. See the
Beam Programming Guide on inferring
schemas for more information on how to enable Beam to infer a Schema
from a custom
Java type.
CsvIO.Write
only supports writing the parts of Schema
aware types that do not
contain any nested Schema.FieldType
s such a Schema.TypeName.ROW
or repeated Schema.TypeName.ARRAY
types. See VALID_FIELD_TYPE_SET
for valid Schema.FieldType
s.
Suppose we have the following Transaction
class annotated with
@DefaultSchema(JavaBeanSchema.class)
so that Beam can infer its Schema
:
@DefaultSchema(JavaBeanSchema.class)
public class Transaction {
public Transaction() { … }
public Long getTransactionId();
public void setTransactionId(Long transactionId) { … }
public String getBank() { … }
public void setBank(String bank) { … }
public double getPurchaseAmount() { … }
public void setPurchaseAmount(double purchaseAmount) { … }
}
From a PCollection<Transaction>
, CsvIO.Write
can write one or many CSV files
automatically creating the header based on its inferred Schema
.
PCollection<Transaction> transactions = ...
transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
The resulting CSV files will look like the following where the header is repeated for every
file, whereas by default, CsvIO.Write
will write all fields in sorted order of the
field names.
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
To control the order and subset of fields that CsvIO.Write
writes, use CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>)
. Note, however, the following constraints:
Schema
; matching is case
sensitive.
Schema
fields that are valid Schema.FieldType
s;
see VALID_FIELD_TYPE_SET
.
CSVFormat
only allows repeated header columns when CSVFormat.withAllowDuplicateHeaderNames()
The following example shows the use of CSVFormat.withHeader(java.lang.Class<? extends java.lang.Enum<?>>)
to control the order and
subset of Transaction
fields.
PCollection<Transaction> transactions ...
transactions.apply(
CsvIO
.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
);
The resulting CSV files will look like the following where the header is repeated for every file, but will only include the subset of fields in their listed order.
transactionId,purchaseAmount
12345,10.23
54321,54.65
98765,11.76
In addition to header customization, CsvIO.Write
supports CSVFormat.withHeaderComments(java.lang.Object...)
as shown below. Note that CSVFormat.withCommentMarker(char)
is
required when specifying header comments.
PCollection<Transaction> transactions = ...
transactions
.apply(
CsvIO.<Transaction>write("path/to/folder/prefix",
CSVFormat.DEFAULT
.withCommentMarker('#')
.withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
);
The resulting CSV files will look like the following where the header and header comments are repeated for every shard file.
# Bank Report
# 1970-01-01
# Operator: John Doe
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
A PCollection
of Row
s works just like custom Java types illustrated above,
except we use writeRows(java.lang.String, org.apache.commons.csv.CSVFormat)
as shown below for the same Transaction
class. We
derive Transaction
's Schema
using a DefaultSchema.DefaultSchemaProvider
. Note that
hard-coding the Row
s below is for illustration purposes. Developers are instead
encouraged to take advantage of DefaultSchema.DefaultSchemaProvider.toRowFunction(org.apache.beam.sdk.values.TypeDescriptor<T>)
.
DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
PCollection<Row> transactions = pipeline.apply(Create.of(
Row
.withSchema(schema)
.withFieldValue("bank", "A")
.withFieldValue("purchaseAmount", 10.23)
.withFieldValue("transactionId", "12345")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "B")
.withFieldValue("purchaseAmount", 54.65)
.withFieldValue("transactionId", "54321")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "C")
.withFieldValue("purchaseAmount", 11.76)
.withFieldValue("transactionId", "98765")
.build()
);
transactions.apply(
CsvIO
.writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
);
Writing the transactions PCollection
of Row
s would yield the following CSV
file content.
bank,purchaseAmount,transactionId
A,10.23,12345
B,54.65,54321
C,11.76,98765
CsvIO.Write
does not support the following CSVFormat
properties and will throw an
IllegalArgumentException
.
Modifier and Type | Class and Description |
---|---|
static class |
CsvIO.Write<T>
PTransform for writing CSV files. |
Modifier and Type | Field and Description |
---|---|
static java.util.Set<Schema.FieldType> |
VALID_FIELD_TYPE_SET
The valid
Schema.FieldType from which CsvIO converts CSV records to the fields. |
Constructor and Description |
---|
CsvIO() |
Modifier and Type | Method and Description |
---|---|
static <T> CsvIOParse<T> |
parse(java.lang.Class<T> klass,
CSVFormat csvFormat)
Instantiates a
CsvIOParse for parsing CSV string records into custom Schema -mapped Class<T> es from the records' assumed CsvFormat. |
static CsvIOParse<Row> |
parseRows(Schema schema,
CSVFormat csvFormat)
Instantiates a
CsvIOParse for parsing CSV string records into Row s from the
records' assumed CsvFormat
and expected Schema . |
static <T> CsvIO.Write<T> |
write(java.lang.String to,
CSVFormat csvFormat)
Instantiates a
CsvIO.Write for writing user types in CSVFormat format. |
static CsvIO.Write<Row> |
writeRows(java.lang.String to,
CSVFormat csvFormat)
|
public static final java.util.Set<Schema.FieldType> VALID_FIELD_TYPE_SET
Schema.FieldType
from which CsvIO
converts CSV records to the fields.
public static <T> CsvIO.Write<T> write(java.lang.String to, CSVFormat csvFormat)
CsvIO.Write
for writing user types in CSVFormat
format.public static CsvIO.Write<Row> writeRows(java.lang.String to, CSVFormat csvFormat)
public static <T> CsvIOParse<T> parse(java.lang.Class<T> klass, CSVFormat csvFormat)
CsvIOParse
for parsing CSV string records into custom Schema
-mapped Class<T>
es from the records' assumed CsvFormat.
See the Beam
Programming Guide on how to configure your custom Class<T>
for Beam to infer its
Schema
using a SchemaProvider
annotation such as AutoValueSchema
or
JavaBeanSchema
.
TextIO.Read
, into an AutoValueSchema
annotated AutoValue data
class PCollection
.
{@code // SomeDataClass is a data class configured for Beam to automatically infer its Schema.
public static CsvIOParse<Row> parseRows(Schema schema, CSVFormat csvFormat)
CsvIOParse
for parsing CSV string records into Row
s from the
records' assumed CsvFormat
and expected Schema
.
TextIO.Read
, into a Row
PCollection
.
// Define the expected Schema.
Schema schema = Schema.of(
Schema.Field.of("someString", FieldType.STRING),
Schema.Field.of("someInteger", FieldType.INT32)
);
// Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery.
Pipeline pipeline = Pipeline.create();
// Read CSV records from Google Cloud storage using TextIO.
PCollection<String> csvRecords = pipeline
.apply(TextIO.read().from("gs://bucket/folder/*.csv");
// Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parseRows.
CsvIOParseResult<Row> result = csvRecords.apply(CsvIO.parseRow(
schema,
CsvFormat.DEFAULT.withHeader("someString", "someInteger")
));
// Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery.
result.getErrors().apply(BigQueryIO.<CsvIOParseError>write()
.to("project:dataset.table_of_errors")
.useBeamSchema()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Acquire the successful PCollection<Row> output.
PCollection<Row> output = result.getOutput();
// Do something with the output such as write to BigQuery.
output.apply(BigQueryIO.<Row>write()
.to("project:dataset.table_of_output")
.useBeamSchema()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));