Class JsonIO
PTransform
s for reading and writing JSON files.
Reading JSON files
Reading from JSON files is not yet implemented in Java. Please see https://github.com/apache/beam/issues/24552.
Writing JSON files
To write a PCollection
to one or more line-delimited JSON files, use JsonIO.Write
, usingwriteRows(java.lang.String)
or write(java.lang.String)
. JsonIO.Write
supports writing Row
or custom Java types using an inferred Schema
. Examples
below show both scenarios. See the Beam Programming Guide on inferring
schemas for more information on how to enable Beam to infer a Schema
from a custom
Java type.
Example usage:
Suppose we have the following Transaction
class annotated with
@DefaultSchema(JavaBeanSchema.class)
so that Beam can infer its Schema
:
@DefaultSchema(JavaBeanSchema.class)
public class Transaction {
public Transaction() { … }
public Long getTransactionId();
public void setTransactionId(Long transactionId) { … }
public String getBank() { … }
public void setBank(String bank) { … }
public double getPurchaseAmount() { … }
public void setPurchaseAmount(double purchaseAmount) { … }
}
From a PCollection<Transaction>
, JsonIO.Write
can write one or many JSON
files.
PCollection<Transaction> transactions = ...
transactions.apply(JsonIO.<Transaction>write("path/to/folder/prefix"));
The resulting JSON files will look like the following where the header is repeated for every
file, whereas by default, JsonIO.Write
will write all fields in sorted order of
the field names.
{"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
{"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
{"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
A PCollection
of Row
s works just like custom Java types illustrated above,
except we use writeRows(java.lang.String)
as shown below for the same Transaction
class. We
derive Transaction
's Schema
using a DefaultSchema.DefaultSchemaProvider
. Note that
hard-coding the Row
s below is for illustration purposes. Developers are instead
encouraged to take advantage of DefaultSchema.DefaultSchemaProvider.toRowFunction(org.apache.beam.sdk.values.TypeDescriptor<T>)
.
DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
PCollection<Row> transactions = pipeline.apply(Create.of(
Row
.withSchema(schema)
.withFieldValue("bank", "A")
.withFieldValue("purchaseAmount", 10.23)
.withFieldValue("transactionId", "12345")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "B")
.withFieldValue("purchaseAmount", 54.65)
.withFieldValue("transactionId", "54321")
.build(),
Row
.withSchema(schema)
.withFieldValue("bank", "C")
.withFieldValue("purchaseAmount", 11.76)
.withFieldValue("transactionId", "98765")
.build()
);
transactions.apply(
JsonIO
.writeRowsTo("gs://bucket/path/to/folder/prefix")
);
Writing the transactions PCollection
of Row
s would yield the following JSON
file content.
{"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
{"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
{"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
-
Nested Class Summary
Nested Classes -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> JsonIO.Write
<T> Instantiates aJsonIO.Write
for writing user types ininvalid reference
JSONFormat
static JsonIO.Write
<Row>
-
Constructor Details
-
JsonIO
public JsonIO()
-
-
Method Details
-
write
Instantiates aJsonIO.Write
for writing user types ininvalid reference
JSONFormat
-
writeRows
-