Class JsonIO

java.lang.Object
org.apache.beam.sdk.io.json.JsonIO

public class JsonIO extends Object
PTransforms for reading and writing JSON files.

Reading JSON files

Reading from JSON files is not yet implemented in Java. Please see https://github.com/apache/beam/issues/24552.

Writing JSON files

To write a PCollection to one or more line-delimited JSON files, use JsonIO.Write, usingwriteRows(java.lang.String) or write(java.lang.String). JsonIO.Write supports writing Row or custom Java types using an inferred Schema. Examples below show both scenarios. See the Beam Programming Guide on inferring schemas for more information on how to enable Beam to infer a Schema from a custom Java type.

Example usage:

Suppose we have the following Transaction class annotated with @DefaultSchema(JavaBeanSchema.class) so that Beam can infer its Schema:

@DefaultSchema(JavaBeanSchema.class)
 public class Transaction {
   public Transaction() { … }
   public Long getTransactionId();
   public void setTransactionId(Long transactionId) { … }
   public String getBank() { … }
   public void setBank(String bank) { … }
   public double getPurchaseAmount() { … }
   public void setPurchaseAmount(double purchaseAmount) { … }
 }
 

From a PCollection<Transaction>, JsonIO.Write can write one or many JSON files.


 PCollection<Transaction> transactions = ...
 transactions.apply(JsonIO.<Transaction>write("path/to/folder/prefix"));
 

The resulting JSON files will look like the following where the header is repeated for every file, whereas by default, JsonIO.Write will write all fields in sorted order of the field names.


 {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
 {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
 {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
 

A PCollection of Rows works just like custom Java types illustrated above, except we use writeRows(java.lang.String) as shown below for the same Transaction class. We derive Transaction's Schema using a DefaultSchema.DefaultSchemaProvider. Note that hard-coding the Rows below is for illustration purposes. Developers are instead encouraged to take advantage of DefaultSchema.DefaultSchemaProvider.toRowFunction(org.apache.beam.sdk.values.TypeDescriptor<T>).


 DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
 Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
 PCollection<Row> transactions = pipeline.apply(Create.of(
  Row
    .withSchema(schema)
    .withFieldValue("bank", "A")
    .withFieldValue("purchaseAmount", 10.23)
    .withFieldValue("transactionId", "12345")
    .build(),
  Row
    .withSchema(schema)
    .withFieldValue("bank", "B")
    .withFieldValue("purchaseAmount", 54.65)
    .withFieldValue("transactionId", "54321")
    .build(),
  Row
    .withSchema(schema)
    .withFieldValue("bank", "C")
    .withFieldValue("purchaseAmount", 11.76)
    .withFieldValue("transactionId", "98765")
    .build()
 );

 transactions.apply(
  JsonIO
    .writeRowsTo("gs://bucket/path/to/folder/prefix")
 );
 

Writing the transactions PCollection of Rows would yield the following JSON file content.


 {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
 {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
 {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
 
  • Constructor Details

    • JsonIO

      public JsonIO()
  • Method Details