Class AvroIO
PTransform
s for reading and writing Avro files.
Reading Avro files
To read a PCollection
from one or more Avro files with the same schema known at
pipeline construction time, use read(java.lang.Class<T>)
, using AvroIO.Read.from(org.apache.beam.sdk.options.ValueProvider<java.lang.String>)
to specify the filename or
filepattern to read from. If the filepatterns to be read are themselves in a PCollection
you can use FileIO
to match them and readFiles(java.lang.Class<T>)
to read them. If the schema
is unknown at pipeline construction time, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
or parseFilesGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
.
Many configuration options below apply to several or all of these transforms.
See FileSystems
for information on supported file systems and filepatterns.
Filepattern expansion and watching
By default, the filepatterns are expanded only once. AvroIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>, boolean)
or the
combination of FileIO.Match.continuously(Duration, TerminationCondition)
and readFiles(Class)
allow streaming of new files matching the filepattern(s).
By default, read(java.lang.Class<T>)
prohibits filepatterns that match no files, and readFiles(Class)
allows them in case the filepattern contains a glob wildcard character.
Use AvroIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
or FileIO.Match.withEmptyMatchTreatment(EmptyMatchTreatment)
plus readFiles(Class)
to configure this behavior.
Reading records of a known schema
To read specific records, such as Avro-generated classes, use read(Class)
. To read
GenericRecords
, use readGenericRecords(Schema)
which takes a
Schema
object, or readGenericRecords(String)
which takes an Avro schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema. Likewise, to read a PCollection
of filepatterns, apply FileIO
matching
plus readFilesGenericRecords(org.apache.avro.Schema)
.
For example:
Pipeline p = ...;
// Read Avro-generated classes from files on GCS
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
// Read GenericRecord's of the given schema from files on GCS
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records =
p.apply(AvroIO.readGenericRecords(schema)
.from("gs://my_bucket/path/to/records-*.avro"));
Reading records of an unknown schema
To read records from files whose schema is unknown at pipeline construction time or differs
between files, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
- in this case, you will need to specify a
parsing function for converting each GenericRecord
into a value of your custom type.
Likewise, to read a PCollection
of filepatterns with unknown schema, use FileIO
matching plus parseFilesGenericRecords(SerializableFunction)
.
For example:
Pipeline p = ...;
PCollection<Foo> records =
p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
// If needed, access the schema of the record using record.getSchema()
return ...;
}
}));
Reading from a PCollection
of filepatterns
Pipeline p = ...;
PCollection<String> filepatterns = p.apply(...);
PCollection<AvroAutoGenClass> records =
filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
PCollection<AvroAutoGenClass> records =
filepatterns
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(AvroIO.readFiles(AvroAutoGenClass.class));
PCollection<GenericRecord> genericRecords =
filepatterns.apply(AvroIO.readGenericRecords(schema));
PCollection<Foo> records =
filepatterns
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
Streaming new files matching a filepattern
Pipeline p = ...;
PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
.read(AvroAutoGenClass.class)
.from("gs://my_bucket/path/to/records-*.avro")
.watchForNewFiles(
// Check for new files every minute
Duration.standardMinutes(1),
// Stop watching the filepattern if no new files appear within an hour
afterTimeSinceNewOutput(Duration.standardHours(1))));
Reading a very large number of files
If it is known that the filepattern will match a very large number of files (e.g. tens of
thousands or more), use AvroIO.Read.withHintMatchesManyFiles()
for better performance and
scalability. Note that it may decrease performance if the filepattern matches only a small number
of files.
Inferring Beam schemas from Avro files
If you want to use SQL or schema based operations on an Avro-based PCollection, you must configure the read transform to infer the Beam schema and automatically setup the Beam related coders by doing:
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
Inferring Beam schemas from Avro PCollections
If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as the output of another PTransform, you may be interested on making your PCollection schema-aware so you can use the Schema-based APIs or Beam's SqlTransform.
If you are using Avro specific records (generated classes from an Avro schema), you can register a schema provider for the specific Avro class to make any PCollection of these objects schema-aware.
pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
You can also manually set an Avro-backed Schema coder for a PCollection using AvroUtils.schemaCoder(Class, Schema)
to make it schema-aware.
PCollection<AvroAutoGenClass> records = ...
AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
If you are using GenericRecords you may need to set a specific Beam schema coder for each PCollection to match their internal Avro schema.
org.apache.avro.Schema avroSchema = ...
PCollection<GenericRecord> records = ...
records.setCoder(AvroUtils.schemaCoder(avroSchema));
Writing Avro files
To write a PCollection
to one or more Avro files, use AvroIO.Write
, using
AvroIO.write().to(String)
to specify the output filename prefix. The default DefaultFilenamePolicy
will use this prefix, in conjunction with a ShardNameTemplate
(set
via AvroIO.Write.withShardNameTemplate(String)
) and optional filename suffix (set via AvroIO.Write.withSuffix(String)
, to generate output filenames in a sharded way. You can override this
default write filename policy using AvroIO.Write.to(FilenamePolicy)
to specify a custom file
naming policy.
By default, AvroIO.Write
produces output files that are compressed using the CodecFactory.snappyCodec()
. This default can be changed or overridden
using AvroIO.Write.withCodec(org.apache.avro.file.CodecFactory)
.
Writing specific or generic records
To write specific records, such as Avro-generated classes, use write(Class)
. To write
GenericRecords
, use either writeGenericRecords(Schema)
which takes
a Schema
object, or writeGenericRecords(String)
which takes a schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema.
For example:
// A simple Write to a local file (only runs locally):
PCollection<AvroAutoGenClass> records = ...;
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
// A Write to a sharded GCS file (runs locally and using remote execution):
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records = ...;
records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
.to("gs://my_bucket/path/to/numbers")
.withSuffix(".avro"));
Writing windowed or unbounded data
By default, all input is put into the global window before writing. If per-window writes are
desired - for example, when using a streaming runner - AvroIO.Write.withWindowedWrites()
will
cause windowing and triggering to be preserved. When producing windowed writes with a streaming
runner that supports triggers, the number of output shards must be set explicitly using AvroIO.Write.withNumShards(int)
; some runners may set this for you to a runner-chosen value, so you may
need not set it yourself. A FileBasedSink.FilenamePolicy
must be set, and unique windows and triggers
must produce unique filenames.
Writing data to multiple destinations
The following shows a more-complex example of AvroIO.Write usage, generating dynamic file destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user events (e.g. actions on a website) is written out to Avro files. Each event contains the user id as an integer field. We want events for each user to go into a specific directory for that user, and each user's data should be written with a specific schema for that user; a side input is used, so the schema can be calculated in a different stage.
// This is the user class that controls dynamic destinations for this avro write. The input to
// AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
// to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
// of Integer.
class UserDynamicAvroDestinations
extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
private final PCollectionView<Map<Integer, String>> userToSchemaMap;
public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
this.userToSchemaMap = userToSchemaMap;
}
public GenericRecord formatRecord(UserEvent record) {
return formatUserRecord(record, getSchema(record.getUserId()));
}
public Schema getSchema(Integer userId) {
return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
}
public Integer getDestination(UserEvent record) {
return record.getUserId();
}
public Integer getDefaultDestination() {
return 0;
}
public FilenamePolicy getFilenamePolicy(Integer userId) {
return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ userId + "/events"));
}
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
}
}
PCollection<UserEvents> events = ...;
PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
"ComputePerUserSchemas", new ComputePerUserSchemas());
events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
.to(new UserDynamicAvroDestinations(userToSchemaMap)));
Error handling for writing records that are malformed can be handled by using AvroIO.TypedWrite.withBadRecordErrorHandler(ErrorHandler)
. See documentation in FileIO
for
details on usage
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
static class
Deprecated.static class
static class
Implementation ofread(java.lang.Class<T>)
andreadGenericRecords(org.apache.avro.Schema)
.static class
Deprecated.SeereadAll(Class)
for details.static class
Implementation ofreadFiles(java.lang.Class<T>)
.static interface
Deprecated.Users can achieve the same by providing this transform in aParDo
before using write in AvroIOwrite(Class)
.static class
static class
Implementation ofwrite(java.lang.Class<T>)
.static class
This class is used as the default return value ofwrite(java.lang.Class<T>)
-
Method Summary
Modifier and TypeMethodDescriptionstatic <UserT,
OutputT>
DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations
(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, Map<String, Object> metadata, CodecFactory codec, SerializableFunction<UserT, OutputT> formatFunction) Returns aDynamicAvroDestinations
that always returns the sameFileBasedSink.FilenamePolicy
, schema, metadata, and codec.static <UserT,
OutputT>
DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations
(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, Map<String, Object> metadata, CodecFactory codec, SerializableFunction<UserT, OutputT> formatFunction, @Nullable AvroSink.DatumWriterFactory<OutputT> datumWriterFactory) Returns aDynamicAvroDestinations
that always returns the sameFileBasedSink.FilenamePolicy
, schema, metadata, and codec.static <T> AvroIO.ParseAll
<T> parseAllGenericRecords
(SerializableFunction<GenericRecord, T> parseFn) Deprecated.You can achieve The functionality ofparseAllGenericRecords(SerializableFunction)
usingFileIO
matching plusparseFilesGenericRecords(SerializableFunction)
()}.static <T> AvroIO.ParseFiles
<T> LikeparseGenericRecords(SerializableFunction)
, but reads eachFileIO.ReadableFile
in the inputPCollection
.static <T> AvroIO.Parse
<T> parseGenericRecords
(SerializableFunction<GenericRecord, T> parseFn) Reads Avro file(s) containing records of an unspecified schema and converting each record to a custom type.static <T> AvroIO.Read
<T> Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).static <T> AvroIO.ReadAll
<T> Deprecated.You can achieve The functionality ofreadAll(java.lang.Class<T>)
usingFileIO
matching plusreadFiles(Class)
.static AvroIO.ReadAll
<GenericRecord> readAllGenericRecords
(String schema) Deprecated.You can achieve The functionality ofreadAllGenericRecords(String)
usingFileIO
matching plusreadFilesGenericRecords(String)
.static AvroIO.ReadAll
<GenericRecord> readAllGenericRecords
(Schema schema) Deprecated.You can achieve The functionality ofreadAllGenericRecords(Schema)
usingFileIO
matching plusreadFilesGenericRecords(Schema)
.static <T> AvroIO.ReadFiles
<T> Likeread(java.lang.Class<T>)
, but reads each file in aPCollection
ofFileIO.ReadableFile
, returned byFileIO.readMatches()
.static AvroIO.ReadFiles
<GenericRecord> readFilesGenericRecords
(String schema) LikereadGenericRecords(String)
, but forFileIO.ReadableFile
collections.static AvroIO.ReadFiles
<GenericRecord> readFilesGenericRecords
(Schema schema) LikereadGenericRecords(Schema)
, but for aPCollection
ofFileIO.ReadableFile
, for example, returned byFileIO.readMatches()
.static AvroIO.Read
<GenericRecord> readGenericRecords
(String schema) Reads Avro file(s) containing records of the specified schema.static AvroIO.Read
<GenericRecord> readGenericRecords
(Schema schema) Reads Avro file(s) containing records of the specified schema.static <ElementT> AvroIO.Sink
<ElementT> AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements of the given generated class, likewrite(Class)
.static <ElementT extends IndexedRecord>
AvroIO.Sink<ElementT> AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements with a given (common) schema, likewriteGenericRecords(String)
.static <ElementT extends IndexedRecord>
AvroIO.Sink<ElementT> AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements with a given (common) schema, likewriteGenericRecords(Schema)
.static <ElementT> AvroIO.Sink
<ElementT> sinkViaGenericRecords
(Schema schema, AvroIO.RecordFormatter<ElementT> formatter) Deprecated.RecordFormatter will be removed in future versions.static <T> AvroIO.Write
<T> Writes aPCollection
to an Avro file (or multiple Avro files matching a sharding pattern).static <UserT> AvroIO.TypedWrite
<UserT, Void, Object> Deprecated.UsewriteCustomType(Class)
instead and provide the custom record classstatic <UserT,
OutputT>
AvroIO.TypedWrite<UserT, Void, OutputT> writeCustomType
(Class<OutputT> recordClass) APTransform
that writes aPCollection
to an avro file (or multiple avro files matching a sharding pattern), with each element of the input collection encoded into its own record of type OutputT.static <UserT> AvroIO.TypedWrite
<UserT, Void, GenericRecord> Similar towriteCustomType()
, but specialized for the case where the output type isGenericRecord
.static AvroIO.Write
<GenericRecord> writeGenericRecords
(String schema) Writes Avro records of the specified schema.static AvroIO.Write
<GenericRecord> writeGenericRecords
(Schema schema) Writes Avro records of the specified schema.
-
Method Details
-
read
Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).The schema must be specified using one of the
withSchema
functions. -
readFiles
Likeread(java.lang.Class<T>)
, but reads each file in aPCollection
ofFileIO.ReadableFile
, returned byFileIO.readMatches()
. -
readAll
Deprecated.You can achieve The functionality ofreadAll(java.lang.Class<T>)
usingFileIO
matching plusreadFiles(Class)
. This is the preferred method to make composition explicit.AvroIO.ReadAll
will not receive upgrades and will be removed in a future version of Beam.Likeread(java.lang.Class<T>)
, but reads each filepattern in the inputPCollection
. -
readGenericRecords
Reads Avro file(s) containing records of the specified schema. -
readFilesGenericRecords
LikereadGenericRecords(Schema)
, but for aPCollection
ofFileIO.ReadableFile
, for example, returned byFileIO.readMatches()
. -
readAllGenericRecords
Deprecated.You can achieve The functionality ofreadAllGenericRecords(Schema)
usingFileIO
matching plusreadFilesGenericRecords(Schema)
. This is the preferred method to make composition explicit.AvroIO.ReadAll
will not receive upgrades and will be removed in a future version of Beam.LikereadGenericRecords(Schema)
, but for aPCollection
ofFileIO.ReadableFile
, for example, returned byFileIO.readMatches()
. -
readGenericRecords
Reads Avro file(s) containing records of the specified schema. The schema is specified as a JSON-encoded string. -
readFilesGenericRecords
LikereadGenericRecords(String)
, but forFileIO.ReadableFile
collections. -
readAllGenericRecords
Deprecated.You can achieve The functionality ofreadAllGenericRecords(String)
usingFileIO
matching plusreadFilesGenericRecords(String)
. This is the preferred method to make composition explicit.AvroIO.ReadAll
will not receive upgrades and will be removed in a future version of Beam.LikereadGenericRecords(String)
, but reads each filepattern in the inputPCollection
. -
parseGenericRecords
public static <T> AvroIO.Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) Reads Avro file(s) containing records of an unspecified schema and converting each record to a custom type. -
parseFilesGenericRecords
public static <T> AvroIO.ParseFiles<T> parseFilesGenericRecords(SerializableFunction<GenericRecord, T> parseFn) LikeparseGenericRecords(SerializableFunction)
, but reads eachFileIO.ReadableFile
in the inputPCollection
. -
parseAllGenericRecords
@Deprecated public static <T> AvroIO.ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord, T> parseFn) Deprecated.You can achieve The functionality ofparseAllGenericRecords(SerializableFunction)
usingFileIO
matching plusparseFilesGenericRecords(SerializableFunction)
()}. This is the preferred method to make composition explicit.AvroIO.ParseAll
will not receive upgrades and will be removed in a future version of Beam.LikeparseGenericRecords(SerializableFunction)
, but reads each filepattern in the inputPCollection
. -
write
Writes aPCollection
to an Avro file (or multiple Avro files matching a sharding pattern). -
writeGenericRecords
Writes Avro records of the specified schema. -
writeCustomType
Deprecated.UsewriteCustomType(Class)
instead and provide the custom record classAPTransform
that writes aPCollection
to an avro file (or multiple avro files matching a sharding pattern), with each element of the input collection encoded into its own record of type OutputT.This version allows you to apply
AvroIO
writes to a PCollection of a custom typeAvroIO
. A format mechanism that converts the input typeAvroIO
to the output type that will be written to the file must be specified. If using a customDynamicAvroDestinations
object this is done usingFileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise theAvroIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, OutputT>)
can be used to specify a format function.The advantage of using a custom type is that is it allows a user-provided
DynamicAvroDestinations
object, set viaAvroIO.Write.to(DynamicAvroDestinations)
to examine the custom type when choosing a destination.If the output type is
GenericRecord
usewriteCustomTypeToGenericRecords()
instead. -
writeCustomType
public static <UserT,OutputT> AvroIO.TypedWrite<UserT,Void, writeCustomTypeOutputT> (Class<OutputT> recordClass) APTransform
that writes aPCollection
to an avro file (or multiple avro files matching a sharding pattern), with each element of the input collection encoded into its own record of type OutputT.This version allows you to apply
AvroIO
writes to a PCollection of a custom typeAvroIO
. A format mechanism that converts the input typeAvroIO
to the output type that will be written to the file must be specified. If using a customDynamicAvroDestinations
object this is done usingFileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise theAvroIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, OutputT>)
can be used to specify a format function.The advantage of using a custom type is that is it allows a user-provided
DynamicAvroDestinations
object, set viaAvroIO.Write.to(DynamicAvroDestinations)
to examine the custom type when choosing a destination.If the output type is
GenericRecord
usewriteCustomTypeToGenericRecords()
instead. -
writeCustomTypeToGenericRecords
Similar towriteCustomType()
, but specialized for the case where the output type isGenericRecord
. A schema must be specified either inDynamicAvroDestinations.getSchema(DestinationT)
or if not using dynamic destinations, by usingAvroIO.TypedWrite.withSchema(Schema)
. -
writeGenericRecords
Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. -
constantDestinations
public static <UserT,OutputT> DynamicAvroDestinations<UserT,Void, constantDestinationsOutputT> (FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, Map<String, Object> metadata, CodecFactory codec, SerializableFunction<UserT, OutputT> formatFunction) Returns aDynamicAvroDestinations
that always returns the sameFileBasedSink.FilenamePolicy
, schema, metadata, and codec. -
constantDestinations
public static <UserT,OutputT> DynamicAvroDestinations<UserT,Void, constantDestinationsOutputT> (FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, Map<String, Object> metadata, CodecFactory codec, SerializableFunction<UserT, OutputT> formatFunction, @Nullable AvroSink.DatumWriterFactory<OutputT> datumWriterFactory) Returns aDynamicAvroDestinations
that always returns the sameFileBasedSink.FilenamePolicy
, schema, metadata, and codec. -
sink
AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements of the given generated class, likewrite(Class)
. -
sink
AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements with a given (common) schema, likewriteGenericRecords(Schema)
. -
sink
AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements with a given (common) schema, likewriteGenericRecords(String)
. -
sinkViaGenericRecords
@Deprecated public static <ElementT> AvroIO.Sink<ElementT> sinkViaGenericRecords(Schema schema, AvroIO.RecordFormatter<ElementT> formatter) Deprecated.RecordFormatter will be removed in future versions.AAvroIO.Sink
for use withFileIO.write()
andFileIO.writeDynamic()
, writing elements by converting each one to aGenericRecord
with a given (common) schema, likewriteCustomTypeToGenericRecords()
.
-
parseAllGenericRecords(SerializableFunction)
for details.