public class AvroIO
extends java.lang.Object
PTransforms for reading and writing Avro files.
 To read a PCollection from one or more Avro files with the same schema known at
 pipeline construction time, use read(java.lang.Class<T>), using AvroIO.Read.from(org.apache.beam.sdk.options.ValueProvider<java.lang.String>) to specify the
 filename or filepattern to read from. If the filepatterns to be read are themselves in a PCollection, apply readAll(java.lang.Class<T>). If the schema is unknown at pipeline construction time, use
 parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>) or parseAllGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>).
 
Many configuration options below apply to several or all of these transforms.
See FileSystems for information on supported file systems and filepatterns.
 
By default, read(java.lang.Class<T>) prohibits filepatterns that match no files, and readAll(java.lang.Class<T>)
 allows them in case the filepattern contains a glob wildcard character. Use AvroIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment) to configure this behavior.
 
By default, the filepatterns are expanded only once. AvroIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>) allows
 streaming of new files matching the filepattern(s).
 
To read specific records, such as Avro-generated classes, use read(Class). To read
 GenericRecords, use readGenericRecords(Schema) which takes a
 Schema object, or readGenericRecords(String) which takes an Avro schema in a
 JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
 schema. Likewise, to read a PCollection of filepatterns, apply readAllGenericRecords(org.apache.avro.Schema).
 
For example:
 Pipeline p = ...;
 // Read Avro-generated classes from files on GCS
 PCollection<AvroAutoGenClass> records =
     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
 // Read GenericRecord's of the given schema from files on GCS
 Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
 PCollection<GenericRecord> records =
     p.apply(AvroIO.readGenericRecords(schema)
                .from("gs://my_bucket/path/to/records-*.avro"));
 To read records from files whose schema is unknown at pipeline construction time or differs
 between files, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>) - in this case, you will need to specify a
 parsing function for converting each GenericRecord into a value of your custom type.
 Likewise, to read a PCollection of filepatterns with unknown schema, use parseAllGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>).
 
For example:
 Pipeline p = ...;
 PCollection<Foo> records =
     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
       public Foo apply(GenericRecord record) {
         // If needed, access the schema of the record using record.getSchema()
         return ...;
       }
     }));
 PCollection of filepatterns
 Pipeline p = ...;
 PCollection<String> filepatterns = p.apply(...);
 PCollection<AvroAutoGenClass> records =
     filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
 PCollection<GenericRecord> genericRecords =
     filepatterns.apply(AvroIO.readGenericRecords(schema));
 PCollection<Foo> records =
     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
 
 Pipeline p = ...;
 PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
     .read(AvroAutoGenClass.class)
     .from("gs://my_bucket/path/to/records-*.avro")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));
 If it is known that the filepattern will match a very large number of files (e.g. tens of
 thousands or more), use AvroIO.Read.withHintMatchesManyFiles() for better performance and
 scalability. Note that it may decrease performance if the filepattern matches only a small number
 of files.
 
To write a PCollection to one or more Avro files, use AvroIO.Write, using
 AvroIO.write().to(String) to specify the output filename prefix. The default DefaultFilenamePolicy will use this prefix, in conjunction with a ShardNameTemplate (set
 via AvroIO.Write.withShardNameTemplate(String)) and optional filename suffix (set via AvroIO.Write.withSuffix(String), to generate output filenames in a sharded way. You can override this
 default write filename policy using AvroIO.Write.to(FileBasedSink.FilenamePolicy) to specify a
 custom file naming policy.
 
By default, AvroIO.Write produces output files that are compressed using the CodecFactory.deflateCodec(6). This default can be changed or
 overridden using AvroIO.Write.withCodec(org.apache.avro.file.CodecFactory).
 
To write specific records, such as Avro-generated classes, use write(Class). To write
 GenericRecords, use either writeGenericRecords(Schema) which takes
 a Schema object, or writeGenericRecords(String) which takes a schema in a
 JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
 schema.
 
For example:
 // A simple Write to a local file (only runs locally):
 PCollection<AvroAutoGenClass> records = ...;
 records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
 // A Write to a sharded GCS file (runs locally and using remote execution):
 Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
 PCollection<GenericRecord> records = ...;
 records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
     .to("gs://my_bucket/path/to/numbers")
     .withSuffix(".avro"));
 By default, all input is put into the global window before writing. If per-window writes are
 desired - for example, when using a streaming runner - AvroIO.Write.withWindowedWrites()
 will cause windowing and triggering to be preserved. When producing windowed writes with a
 streaming runner that supports triggers, the number of output shards must be set explicitly using
 AvroIO.Write.withNumShards(int); some runners may set this for you to a runner-chosen
 value, so you may need not set it yourself. A FileBasedSink.FilenamePolicy must be set,
 and unique windows and triggers must produce unique filenames.
 
The following shows a more-complex example of AvroIO.Write usage, generating dynamic file destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user events (e.g. actions on a website) is written out to Avro files. Each event contains the user id as an integer field. We want events for each user to go into a specific directory for that user, and each user's data should be written with a specific schema for that user; a side input is used, so the schema can be calculated in a different stage.
 // This is the user class that controls dynamic destinations for this avro write. The input to
 // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
 // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
 // of Integer.
 class UserDynamicAvroDestinations
     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
     this.userToSchemaMap = userToSchemaMap;
   }
   public GenericRecord formatRecord(UserEvent record) {
     return formatUserRecord(record, getSchema(record.getUserId()));
   }
   public Schema getSchema(Integer userId) {
     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
   }
   public Integer getDestination(UserEvent record) {
     return record.getUserId();
   }
   public Integer getDefaultDestination() {
     return 0;
   }
   public FilenamePolicy getFilenamePolicy(Integer userId) {
     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
     + userId + "/events"));
   }
   public List<PCollectionView<?>> getSideInputs() {
     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
   }
 }
 PCollection<UserEvents> events = ...;
 PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
     "ComputePerUserSchemas", new ComputePerUserSchemas());
 events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
 | Modifier and Type | Class and Description | 
|---|---|
| static class  | AvroIO.Parse<T> | 
| static class  | AvroIO.ParseAll<T> | 
| static class  | AvroIO.Read<T>Implementation of  read(java.lang.Class<T>)andreadGenericRecords(org.apache.avro.Schema). | 
| static class  | AvroIO.ReadAll<T>Implementation of  readAll(java.lang.Class<T>). | 
| static interface  | AvroIO.RecordFormatter<ElementT>Formats an element of a user type into a record with the given schema. | 
| static class  | AvroIO.Sink<ElementT> | 
| static class  | AvroIO.TypedWrite<UserT,DestinationT,OutputT>Implementation of  write(java.lang.Class<T>). | 
| static class  | AvroIO.Write<T>This class is used as the default return value of  write(java.lang.Class<T>) | 
| Modifier and Type | Method and Description | 
|---|---|
| static <UserT,OutputT> | constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy,
                    Schema schema,
                    java.util.Map<java.lang.String,java.lang.Object> metadata,
                    CodecFactory codec,
                    SerializableFunction<UserT,OutputT> formatFunction)Returns a  DynamicAvroDestinationsthat always returns the sameFileBasedSink.FilenamePolicy,
 schema, metadata, and codec. | 
| static <T> AvroIO.ParseAll<T> | parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)Like  parseGenericRecords(SerializableFunction), but reads each filepattern in the
 inputPCollection. | 
| static <T> AvroIO.Parse<T> | parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)Reads Avro file(s) containing records of an unspecified schema and converting each record to a
 custom type. | 
| static <T> AvroIO.Read<T> | read(java.lang.Class<T> recordClass)Reads records of the given type from an Avro file (or multiple Avro files matching a pattern). | 
| static <T> AvroIO.ReadAll<T> | readAll(java.lang.Class<T> recordClass)Like  read(java.lang.Class<T>), but reads each filepattern in the inputPCollection. | 
| static AvroIO.ReadAll<GenericRecord> | readAllGenericRecords(Schema schema)Like  readGenericRecords(Schema), but reads each filepattern in the inputPCollection. | 
| static AvroIO.ReadAll<GenericRecord> | readAllGenericRecords(java.lang.String schema)Like  readGenericRecords(String), but reads each filepattern in the inputPCollection. | 
| static AvroIO.Read<GenericRecord> | readGenericRecords(Schema schema)Reads Avro file(s) containing records of the specified schema. | 
| static AvroIO.Read<GenericRecord> | readGenericRecords(java.lang.String schema)Reads Avro file(s) containing records of the specified schema. | 
| static <ElementT> AvroIO.Sink<ElementT> | sink(java.lang.Class<ElementT> clazz)A  AvroIO.Sinkfor use withFileIO.write()andFileIO.writeDynamic(), writing
 elements of the given generated class, likewrite(Class). | 
| static <ElementT> AvroIO.Sink<ElementT> | sinkViaGenericRecords(Schema schema,
                     AvroIO.RecordFormatter<ElementT> formatter)A  AvroIO.Sinkfor use withFileIO.write()andFileIO.writeDynamic(), writing
 elements by converting each one to aGenericRecordwith a given (common) schema, likewriteCustomTypeToGenericRecords(). | 
| static <T> AvroIO.Write<T> | write(java.lang.Class<T> recordClass)Writes a  PCollectionto an Avro file (or multiple Avro files matching a sharding
 pattern). | 
| static <UserT,OutputT> | writeCustomType()A  PTransformthat writes aPCollectionto an avro file (or multiple avro files
 matching a sharding pattern), with each element of the input collection encoded into its own
 record of type OutputT. | 
| static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> | writeCustomTypeToGenericRecords()Similar to  writeCustomType(), but specialized for the case where the output type isGenericRecord. | 
| static AvroIO.Write<GenericRecord> | writeGenericRecords(Schema schema)Writes Avro records of the specified schema. | 
| static AvroIO.Write<GenericRecord> | writeGenericRecords(java.lang.String schema)Writes Avro records of the specified schema. | 
public static <T> AvroIO.Read<T> read(java.lang.Class<T> recordClass)
The schema must be specified using one of the withSchema functions.
public static <T> AvroIO.ReadAll<T> readAll(java.lang.Class<T> recordClass)
read(java.lang.Class<T>), but reads each filepattern in the input PCollection.public static AvroIO.Read<GenericRecord> readGenericRecords(Schema schema)
public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(Schema schema)
readGenericRecords(Schema), but reads each filepattern in the input PCollection.public static AvroIO.Read<GenericRecord> readGenericRecords(java.lang.String schema)
public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(java.lang.String schema)
readGenericRecords(String), but reads each filepattern in the input PCollection.public static <T> AvroIO.Parse<T> parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
public static <T> AvroIO.ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
parseGenericRecords(SerializableFunction), but reads each filepattern in the
 input PCollection.public static <T> AvroIO.Write<T> write(java.lang.Class<T> recordClass)
PCollection to an Avro file (or multiple Avro files matching a sharding
 pattern).public static AvroIO.Write<GenericRecord> writeGenericRecords(Schema schema)
public static <UserT,OutputT> AvroIO.TypedWrite<UserT,java.lang.Void,OutputT> writeCustomType()
PTransform that writes a PCollection to an avro file (or multiple avro files
 matching a sharding pattern), with each element of the input collection encoded into its own
 record of type OutputT.
 This version allows you to apply AvroIO writes to a PCollection of a custom type
 UserT. A format mechanism that converts the input type UserT to the output type
 that will be written to the file must be specified. If using a custom DynamicAvroDestinations object this is done using FileBasedSink.DynamicDestinations.formatRecord(UserT), otherwise the AvroIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, OutputT>) can be used to specify a format function.
 
The advantage of using a custom type is that is it allows a user-provided DynamicAvroDestinations object, set via AvroIO.Write.to(DynamicAvroDestinations) to
 examine the custom type when choosing a destination.
 
If the output type is GenericRecord use writeCustomTypeToGenericRecords()
 instead.
public static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> writeCustomTypeToGenericRecords()
writeCustomType(), but specialized for the case where the output type is
 GenericRecord. A schema must be specified either in DynamicAvroDestinations.getSchema(DestinationT) or if not using dynamic destinations, by using AvroIO.TypedWrite.withSchema(Schema).public static AvroIO.Write<GenericRecord> writeGenericRecords(java.lang.String schema)
public static <UserT,OutputT> DynamicAvroDestinations<UserT,java.lang.Void,OutputT> constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, java.util.Map<java.lang.String,java.lang.Object> metadata, CodecFactory codec, SerializableFunction<UserT,OutputT> formatFunction)
DynamicAvroDestinations that always returns the same FileBasedSink.FilenamePolicy,
 schema, metadata, and codec.public static <ElementT> AvroIO.Sink<ElementT> sink(java.lang.Class<ElementT> clazz)
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic(), writing
 elements of the given generated class, like write(Class).public static <ElementT> AvroIO.Sink<ElementT> sinkViaGenericRecords(Schema schema, AvroIO.RecordFormatter<ElementT> formatter)
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic(), writing
 elements by converting each one to a GenericRecord with a given (common) schema, like
 writeCustomTypeToGenericRecords().