public class AvroIO
extends java.lang.Object
PTransform
s for reading and writing Avro files.
To read a PCollection
from one or more Avro files with the same schema known at
pipeline construction time, use read(java.lang.Class<T>)
, using AvroIO.Read.from(org.apache.beam.sdk.options.ValueProvider<java.lang.String>)
to specify the
filename or filepattern to read from. If the filepatterns to be read are themselves in a PCollection
you can use FileIO
to match them and readFiles(java.lang.Class<T>)
to read them.
If the schema is unknown at pipeline construction time, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
or
parseFilesGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
.
Many configuration options below apply to several or all of these transforms.
See FileSystems
for information on supported file systems and filepatterns.
By default, the filepatterns are expanded only once. AvroIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>, boolean)
or the
combination of FileIO.Match#continuously(Duration, TerminationCondition)
and readFiles(Class)
allow streaming of new files matching the filepattern(s).
By default, read(java.lang.Class<T>)
prohibits filepatterns that match no files, and readFiles(Class)
allows them in case the filepattern contains a glob wildcard character.
Use AvroIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
or FileIO.Match.withEmptyMatchTreatment(EmptyMatchTreatment)
plus readFiles(Class)
to configure this behavior.
To read specific records, such as Avro-generated classes, use read(Class)
. To read
GenericRecords
, use readGenericRecords(Schema)
which takes a
Schema
object, or readGenericRecords(String)
which takes an Avro schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema. Likewise, to read a PCollection
of filepatterns, apply FileIO
matching
plus readFilesGenericRecords(org.apache.avro.Schema)
.
For example:
Pipeline p = ...;
// Read Avro-generated classes from files on GCS
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
// Read GenericRecord's of the given schema from files on GCS
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records =
p.apply(AvroIO.readGenericRecords(schema)
.from("gs://my_bucket/path/to/records-*.avro"));
To read records from files whose schema is unknown at pipeline construction time or differs
between files, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
- in this case, you will need to specify a
parsing function for converting each GenericRecord
into a value of your custom type.
Likewise, to read a PCollection
of filepatterns with unknown schema, use FileIO
matching plus parseFilesGenericRecords(SerializableFunction)
.
For example:
Pipeline p = ...;
PCollection<Foo> records =
p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
// If needed, access the schema of the record using record.getSchema()
return ...;
}
}));
PCollection
of filepatterns
Pipeline p = ...;
PCollection<String> filepatterns = p.apply(...);
PCollection<AvroAutoGenClass> records =
filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
PCollection<AvroAutoGenClass> records =
filepatterns
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(AvroIO.readFiles(AvroAutoGenClass.class));
PCollection<GenericRecord> genericRecords =
filepatterns.apply(AvroIO.readGenericRecords(schema));
PCollection<Foo> records =
filepatterns
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
Pipeline p = ...;
PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
.read(AvroAutoGenClass.class)
.from("gs://my_bucket/path/to/records-*.avro")
.watchForNewFiles(
// Check for new files every minute
Duration.standardMinutes(1),
// Stop watching the filepattern if no new files appear within an hour
afterTimeSinceNewOutput(Duration.standardHours(1))));
If it is known that the filepattern will match a very large number of files (e.g. tens of
thousands or more), use AvroIO.Read.withHintMatchesManyFiles()
for better performance and
scalability. Note that it may decrease performance if the filepattern matches only a small number
of files.
If you want to use SQL or schema based operations on an Avro-based PCollection, you must configure the read transform to infer the Beam schema and automatically setup the Beam related coders by doing:
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as the output of another PTransform, you may be interested on making your PCollection schema-aware so you can use the Schema-based APIs or Beam's SqlTransform.
If you are using Avro specific records (generated classes from an Avro schema), you can register a schema provider for the specific Avro class to make any PCollection of these objects schema-aware.
pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
You can also manually set an Avro-backed Schema coder for a PCollection using AvroUtils.schemaCoder(Class, Schema)
to make it schema-aware.
PCollection<AvroAutoGenClass> records = ...
AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
If you are using GenericRecords you may need to set a specific Beam schema coder for each PCollection to match their internal Avro schema.
org.apache.avro.Schema avroSchema = ...
PCollection<GenericRecord> records = ...
records.setCoder(AvroUtils.schemaCoder(avroSchema));
To write a PCollection
to one or more Avro files, use AvroIO.Write
, using
AvroIO.write().to(String)
to specify the output filename prefix. The default DefaultFilenamePolicy
will use this prefix, in conjunction with a ShardNameTemplate
(set
via AvroIO.Write.withShardNameTemplate(String)
) and optional filename suffix (set via AvroIO.Write.withSuffix(String)
, to generate output filenames in a sharded way. You can override this
default write filename policy using AvroIO.Write.to(FileBasedSink.FilenamePolicy)
to specify a
custom file naming policy.
By default, AvroIO.Write
produces output files that are compressed using the CodecFactory.snappyCodec()
. This default can be changed or overridden
using AvroIO.Write.withCodec(org.apache.avro.file.CodecFactory)
.
To write specific records, such as Avro-generated classes, use write(Class)
. To write
GenericRecords
, use either writeGenericRecords(Schema)
which takes
a Schema
object, or writeGenericRecords(String)
which takes a schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema.
For example:
// A simple Write to a local file (only runs locally):
PCollection<AvroAutoGenClass> records = ...;
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
// A Write to a sharded GCS file (runs locally and using remote execution):
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records = ...;
records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
.to("gs://my_bucket/path/to/numbers")
.withSuffix(".avro"));
By default, all input is put into the global window before writing. If per-window writes are
desired - for example, when using a streaming runner - AvroIO.Write.withWindowedWrites()
will cause windowing and triggering to be preserved. When producing windowed writes with a
streaming runner that supports triggers, the number of output shards must be set explicitly using
AvroIO.Write.withNumShards(int)
; some runners may set this for you to a runner-chosen
value, so you may need not set it yourself. A FileBasedSink.FilenamePolicy
must be set,
and unique windows and triggers must produce unique filenames.
The following shows a more-complex example of AvroIO.Write usage, generating dynamic file destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user events (e.g. actions on a website) is written out to Avro files. Each event contains the user id as an integer field. We want events for each user to go into a specific directory for that user, and each user's data should be written with a specific schema for that user; a side input is used, so the schema can be calculated in a different stage.
// This is the user class that controls dynamic destinations for this avro write. The input to
// AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
// to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
// of Integer.
class UserDynamicAvroDestinations
extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
private final PCollectionView<Map<Integer, String>> userToSchemaMap;
public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
this.userToSchemaMap = userToSchemaMap;
}
public GenericRecord formatRecord(UserEvent record) {
return formatUserRecord(record, getSchema(record.getUserId()));
}
public Schema getSchema(Integer userId) {
return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
}
public Integer getDestination(UserEvent record) {
return record.getUserId();
}
public Integer getDefaultDestination() {
return 0;
}
public FilenamePolicy getFilenamePolicy(Integer userId) {
return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ userId + "/events"));
}
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
}
}
PCollection<UserEvents> events = ...;
PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
"ComputePerUserSchemas", new ComputePerUserSchemas());
events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
.to(new UserDynamicAvroDestinations(userToSchemaMap)));
Modifier and Type | Class and Description |
---|---|
static class |
AvroIO.Parse<T>
|
static class |
AvroIO.ParseAll<T>
Deprecated.
See
parseAllGenericRecords(SerializableFunction) for details. |
static class |
AvroIO.ParseFiles<T>
|
static class |
AvroIO.Read<T>
Implementation of
read(java.lang.Class<T>) and readGenericRecords(org.apache.avro.Schema) . |
static class |
AvroIO.ReadAll<T>
Deprecated.
See
readAll(Class) for details. |
static class |
AvroIO.ReadFiles<T>
Implementation of
readFiles(java.lang.Class<T>) . |
static interface |
AvroIO.RecordFormatter<ElementT>
Deprecated.
Users can achieve the same by providing this transform in a
ParDo before using write in AvroIO write(Class) . |
static class |
AvroIO.Sink<ElementT>
|
static class |
AvroIO.TypedWrite<UserT,DestinationT,OutputT>
Implementation of
write(java.lang.Class<T>) . |
static class |
AvroIO.Write<T>
This class is used as the default return value of
write(java.lang.Class<T>) |
Modifier and Type | Method and Description |
---|---|
static <UserT,OutputT> |
constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy,
Schema schema,
java.util.Map<java.lang.String,java.lang.Object> metadata,
CodecFactory codec,
SerializableFunction<UserT,OutputT> formatFunction)
Returns a
DynamicAvroDestinations that always returns the same FileBasedSink.FilenamePolicy ,
schema, metadata, and codec. |
static <UserT,OutputT> |
constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy,
Schema schema,
java.util.Map<java.lang.String,java.lang.Object> metadata,
CodecFactory codec,
SerializableFunction<UserT,OutputT> formatFunction,
AvroSink.DatumWriterFactory<OutputT> datumWriterFactory)
Returns a
DynamicAvroDestinations that always returns the same FileBasedSink.FilenamePolicy ,
schema, metadata, and codec. |
static <T> AvroIO.ParseAll<T> |
parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Deprecated.
You can achieve The functionality of
parseAllGenericRecords(SerializableFunction) using FileIO matching plus parseFilesGenericRecords(SerializableFunction) ()}. This is the preferred method to make
composition explicit. AvroIO.ParseAll will not receive upgrades and will be removed in a
future version of Beam. |
static <T> AvroIO.ParseFiles<T> |
parseFilesGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Like
parseGenericRecords(SerializableFunction) , but reads each FileIO.ReadableFile in
the input PCollection . |
static <T> AvroIO.Parse<T> |
parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Reads Avro file(s) containing records of an unspecified schema and converting each record to a
custom type.
|
static <T> AvroIO.Read<T> |
read(java.lang.Class<T> recordClass)
Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
|
static <T> AvroIO.ReadAll<T> |
readAll(java.lang.Class<T> recordClass)
Deprecated.
You can achieve The functionality of
readAll(java.lang.Class<T>) using FileIO matching
plus readFiles(Class) . This is the preferred method to make composition explicit.
AvroIO.ReadAll will not receive upgrades and will be removed in a future version of Beam. |
static AvroIO.ReadAll<GenericRecord> |
readAllGenericRecords(Schema schema)
Deprecated.
You can achieve The functionality of
readAllGenericRecords(Schema) using
FileIO matching plus readFilesGenericRecords(Schema) . This is the
preferred method to make composition explicit. AvroIO.ReadAll will not receive upgrades
and will be removed in a future version of Beam. |
static AvroIO.ReadAll<GenericRecord> |
readAllGenericRecords(java.lang.String schema)
Deprecated.
You can achieve The functionality of
readAllGenericRecords(String) using
FileIO matching plus readFilesGenericRecords(String) . This is the
preferred method to make composition explicit. AvroIO.ReadAll will not receive upgrades
and will be removed in a future version of Beam. |
static <T> AvroIO.ReadFiles<T> |
readFiles(java.lang.Class<T> recordClass)
Like
read(java.lang.Class<T>) , but reads each file in a PCollection of FileIO.ReadableFile ,
returned by FileIO.readMatches() . |
static AvroIO.ReadFiles<GenericRecord> |
readFilesGenericRecords(Schema schema)
Like
readGenericRecords(Schema) , but for a PCollection of FileIO.ReadableFile , for example, returned by FileIO.readMatches() . |
static AvroIO.ReadFiles<GenericRecord> |
readFilesGenericRecords(java.lang.String schema)
Like
readGenericRecords(String) , but for FileIO.ReadableFile collections. |
static AvroIO.Read<GenericRecord> |
readGenericRecords(Schema schema)
Reads Avro file(s) containing records of the specified schema.
|
static AvroIO.Read<GenericRecord> |
readGenericRecords(java.lang.String schema)
Reads Avro file(s) containing records of the specified schema.
|
static <ElementT> AvroIO.Sink<ElementT> |
sink(java.lang.Class<ElementT> clazz)
A
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic() , writing
elements of the given generated class, like write(Class) . |
static <ElementT extends IndexedRecord> |
sink(Schema schema)
A
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic() , writing
elements with a given (common) schema, like writeGenericRecords(Schema) . |
static <ElementT extends IndexedRecord> |
sink(java.lang.String jsonSchema)
A
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic() , writing
elements with a given (common) schema, like writeGenericRecords(String) . |
static <ElementT> AvroIO.Sink<ElementT> |
sinkViaGenericRecords(Schema schema,
AvroIO.RecordFormatter<ElementT> formatter)
Deprecated.
RecordFormatter will be removed in future versions.
|
static <T> AvroIO.Write<T> |
write(java.lang.Class<T> recordClass)
Writes a
PCollection to an Avro file (or multiple Avro files matching a sharding
pattern). |
static <UserT,OutputT> |
writeCustomType()
A
PTransform that writes a PCollection to an avro file (or multiple avro files
matching a sharding pattern), with each element of the input collection encoded into its own
record of type OutputT. |
static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> |
writeCustomTypeToGenericRecords()
Similar to
writeCustomType() , but specialized for the case where the output type is
GenericRecord . |
static AvroIO.Write<GenericRecord> |
writeGenericRecords(Schema schema)
Writes Avro records of the specified schema.
|
static AvroIO.Write<GenericRecord> |
writeGenericRecords(java.lang.String schema)
Writes Avro records of the specified schema.
|
public static <T> AvroIO.Read<T> read(java.lang.Class<T> recordClass)
The schema must be specified using one of the withSchema
functions.
public static <T> AvroIO.ReadFiles<T> readFiles(java.lang.Class<T> recordClass)
read(java.lang.Class<T>)
, but reads each file in a PCollection
of FileIO.ReadableFile
,
returned by FileIO.readMatches()
.
You can read GenericRecord
by using #readFiles(GenericRecord.class)
or
#readFiles(new Schema.Parser().parse(schema))
if the schema is a String.
@Deprecated public static <T> AvroIO.ReadAll<T> readAll(java.lang.Class<T> recordClass)
readAll(java.lang.Class<T>)
using FileIO
matching
plus readFiles(Class)
. This is the preferred method to make composition explicit.
AvroIO.ReadAll
will not receive upgrades and will be removed in a future version of Beam.read(java.lang.Class<T>)
, but reads each filepattern in the input PCollection
.public static AvroIO.Read<GenericRecord> readGenericRecords(Schema schema)
public static AvroIO.ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema)
readGenericRecords(Schema)
, but for a PCollection
of FileIO.ReadableFile
, for example, returned by FileIO.readMatches()
.@Deprecated public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(Schema schema)
readAllGenericRecords(Schema)
using
FileIO
matching plus readFilesGenericRecords(Schema)
. This is the
preferred method to make composition explicit. AvroIO.ReadAll
will not receive upgrades
and will be removed in a future version of Beam.readGenericRecords(Schema)
, but for a PCollection
of FileIO.ReadableFile
, for example, returned by FileIO.readMatches()
.public static AvroIO.Read<GenericRecord> readGenericRecords(java.lang.String schema)
public static AvroIO.ReadFiles<GenericRecord> readFilesGenericRecords(java.lang.String schema)
readGenericRecords(String)
, but for FileIO.ReadableFile
collections.@Deprecated public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(java.lang.String schema)
readAllGenericRecords(String)
using
FileIO
matching plus readFilesGenericRecords(String)
. This is the
preferred method to make composition explicit. AvroIO.ReadAll
will not receive upgrades
and will be removed in a future version of Beam.readGenericRecords(String)
, but reads each filepattern in the input PCollection
.public static <T> AvroIO.Parse<T> parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
public static <T> AvroIO.ParseFiles<T> parseFilesGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
parseGenericRecords(SerializableFunction)
, but reads each FileIO.ReadableFile
in
the input PCollection
.@Deprecated public static <T> AvroIO.ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
parseAllGenericRecords(SerializableFunction)
using FileIO
matching plus parseFilesGenericRecords(SerializableFunction)
()}. This is the preferred method to make
composition explicit. AvroIO.ParseAll
will not receive upgrades and will be removed in a
future version of Beam.parseGenericRecords(SerializableFunction)
, but reads each filepattern in the
input PCollection
.public static <T> AvroIO.Write<T> write(java.lang.Class<T> recordClass)
PCollection
to an Avro file (or multiple Avro files matching a sharding
pattern).public static AvroIO.Write<GenericRecord> writeGenericRecords(Schema schema)
public static <UserT,OutputT> AvroIO.TypedWrite<UserT,java.lang.Void,OutputT> writeCustomType()
PTransform
that writes a PCollection
to an avro file (or multiple avro files
matching a sharding pattern), with each element of the input collection encoded into its own
record of type OutputT.
This version allows you to apply AvroIO
writes to a PCollection of a custom type
UserT
. A format mechanism that converts the input type UserT
to the output type
that will be written to the file must be specified. If using a custom DynamicAvroDestinations
object this is done using FileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise the AvroIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, OutputT>)
can be used to specify a format function.
The advantage of using a custom type is that is it allows a user-provided DynamicAvroDestinations
object, set via AvroIO.Write.to(DynamicAvroDestinations)
to
examine the custom type when choosing a destination.
If the output type is GenericRecord
use writeCustomTypeToGenericRecords()
instead.
public static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> writeCustomTypeToGenericRecords()
writeCustomType()
, but specialized for the case where the output type is
GenericRecord
. A schema must be specified either in DynamicAvroDestinations.getSchema(DestinationT)
or if not using dynamic destinations, by using AvroIO.TypedWrite.withSchema(Schema)
.public static AvroIO.Write<GenericRecord> writeGenericRecords(java.lang.String schema)
public static <UserT,OutputT> DynamicAvroDestinations<UserT,java.lang.Void,OutputT> constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, java.util.Map<java.lang.String,java.lang.Object> metadata, CodecFactory codec, SerializableFunction<UserT,OutputT> formatFunction)
DynamicAvroDestinations
that always returns the same FileBasedSink.FilenamePolicy
,
schema, metadata, and codec.public static <UserT,OutputT> DynamicAvroDestinations<UserT,java.lang.Void,OutputT> constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, java.util.Map<java.lang.String,java.lang.Object> metadata, CodecFactory codec, SerializableFunction<UserT,OutputT> formatFunction, AvroSink.DatumWriterFactory<OutputT> datumWriterFactory)
DynamicAvroDestinations
that always returns the same FileBasedSink.FilenamePolicy
,
schema, metadata, and codec.public static <ElementT> AvroIO.Sink<ElementT> sink(java.lang.Class<ElementT> clazz)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements of the given generated class, like write(Class)
.@Experimental(value=SOURCE_SINK) public static <ElementT extends IndexedRecord> AvroIO.Sink<ElementT> sink(Schema schema)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements with a given (common) schema, like writeGenericRecords(Schema)
.@Experimental(value=SOURCE_SINK) public static <ElementT extends IndexedRecord> AvroIO.Sink<ElementT> sink(java.lang.String jsonSchema)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements with a given (common) schema, like writeGenericRecords(String)
.@Deprecated public static <ElementT> AvroIO.Sink<ElementT> sinkViaGenericRecords(Schema schema, AvroIO.RecordFormatter<ElementT> formatter)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements by converting each one to a GenericRecord
with a given (common) schema, like
writeCustomTypeToGenericRecords()
.