public class AvroIO
extends java.lang.Object
PTransform
s for reading and writing Avro files.
To read a PCollection
from one or more Avro files with the same schema known at
pipeline construction time, use read(java.lang.Class<T>)
, using AvroIO.Read.from(org.apache.beam.sdk.options.ValueProvider<java.lang.String>)
to specify the
filename or filepattern to read from. If the filepatterns to be read are themselves in a PCollection
, apply readAll(java.lang.Class<T>)
. If the schema is unknown at pipeline construction time, use
parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
or parseAllGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
.
Many configuration options below apply to several or all of these transforms.
See FileSystems
for information on supported file systems and filepatterns.
By default, read(java.lang.Class<T>)
prohibits filepatterns that match no files, and readAll(java.lang.Class<T>)
allows them in case the filepattern contains a glob wildcard character. Use AvroIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
to configure this behavior.
By default, the filepatterns are expanded only once. AvroIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>)
allows
streaming of new files matching the filepattern(s).
To read specific records, such as Avro-generated classes, use read(Class)
. To read
GenericRecords
, use readGenericRecords(Schema)
which takes a
Schema
object, or readGenericRecords(String)
which takes an Avro schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema. Likewise, to read a PCollection
of filepatterns, apply readAllGenericRecords(org.apache.avro.Schema)
.
For example:
Pipeline p = ...;
// Read Avro-generated classes from files on GCS
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
// Read GenericRecord's of the given schema from files on GCS
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records =
p.apply(AvroIO.readGenericRecords(schema)
.from("gs://my_bucket/path/to/records-*.avro"));
To read records from files whose schema is unknown at pipeline construction time or differs
between files, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
- in this case, you will need to specify a
parsing function for converting each GenericRecord
into a value of your custom type.
Likewise, to read a PCollection
of filepatterns with unknown schema, use parseAllGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>)
.
For example:
Pipeline p = ...;
PCollection<Foo> records =
p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
// If needed, access the schema of the record using record.getSchema()
return ...;
}
}));
PCollection
of filepatterns
Pipeline p = ...;
PCollection<String> filepatterns = p.apply(...);
PCollection<AvroAutoGenClass> records =
filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
PCollection<GenericRecord> genericRecords =
filepatterns.apply(AvroIO.readGenericRecords(schema));
PCollection<Foo> records =
filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
Pipeline p = ...;
PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
.read(AvroAutoGenClass.class)
.from("gs://my_bucket/path/to/records-*.avro")
.watchForNewFiles(
// Check for new files every minute
Duration.standardMinutes(1),
// Stop watching the filepattern if no new files appear within an hour
afterTimeSinceNewOutput(Duration.standardHours(1))));
If it is known that the filepattern will match a very large number of files (e.g. tens of
thousands or more), use AvroIO.Read.withHintMatchesManyFiles()
for better performance and
scalability. Note that it may decrease performance if the filepattern matches only a small number
of files.
To write a PCollection
to one or more Avro files, use AvroIO.Write
, using
AvroIO.write().to(String)
to specify the output filename prefix. The default DefaultFilenamePolicy
will use this prefix, in conjunction with a ShardNameTemplate
(set
via AvroIO.Write.withShardNameTemplate(String)
) and optional filename suffix (set via AvroIO.Write.withSuffix(String)
, to generate output filenames in a sharded way. You can override this
default write filename policy using AvroIO.Write.to(FileBasedSink.FilenamePolicy)
to specify a
custom file naming policy.
By default, AvroIO.Write
produces output files that are compressed using the CodecFactory.snappyCodec()
. This default can be changed or
overridden using AvroIO.Write.withCodec(org.apache.avro.file.CodecFactory)
.
To write specific records, such as Avro-generated classes, use write(Class)
. To write
GenericRecords
, use either writeGenericRecords(Schema)
which takes
a Schema
object, or writeGenericRecords(String)
which takes a schema in a
JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
schema.
For example:
// A simple Write to a local file (only runs locally):
PCollection<AvroAutoGenClass> records = ...;
records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
// A Write to a sharded GCS file (runs locally and using remote execution):
Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
PCollection<GenericRecord> records = ...;
records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
.to("gs://my_bucket/path/to/numbers")
.withSuffix(".avro"));
By default, all input is put into the global window before writing. If per-window writes are
desired - for example, when using a streaming runner - AvroIO.Write.withWindowedWrites()
will cause windowing and triggering to be preserved. When producing windowed writes with a
streaming runner that supports triggers, the number of output shards must be set explicitly using
AvroIO.Write.withNumShards(int)
; some runners may set this for you to a runner-chosen
value, so you may need not set it yourself. A FileBasedSink.FilenamePolicy
must be set,
and unique windows and triggers must produce unique filenames.
The following shows a more-complex example of AvroIO.Write usage, generating dynamic file destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user events (e.g. actions on a website) is written out to Avro files. Each event contains the user id as an integer field. We want events for each user to go into a specific directory for that user, and each user's data should be written with a specific schema for that user; a side input is used, so the schema can be calculated in a different stage.
// This is the user class that controls dynamic destinations for this avro write. The input to
// AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
// to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
// of Integer.
class UserDynamicAvroDestinations
extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
private final PCollectionView<Map<Integer, String>> userToSchemaMap;
public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
this.userToSchemaMap = userToSchemaMap;
}
public GenericRecord formatRecord(UserEvent record) {
return formatUserRecord(record, getSchema(record.getUserId()));
}
public Schema getSchema(Integer userId) {
return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
}
public Integer getDestination(UserEvent record) {
return record.getUserId();
}
public Integer getDefaultDestination() {
return 0;
}
public FilenamePolicy getFilenamePolicy(Integer userId) {
return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ userId + "/events"));
}
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
}
}
PCollection<UserEvents> events = ...;
PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
"ComputePerUserSchemas", new ComputePerUserSchemas());
events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
.to(new UserDynamicAvroDestinations(userToSchemaMap)));
Modifier and Type | Class and Description |
---|---|
static class |
AvroIO.Parse<T>
|
static class |
AvroIO.ParseAll<T>
|
static class |
AvroIO.Read<T>
Implementation of
read(java.lang.Class<T>) and readGenericRecords(org.apache.avro.Schema) . |
static class |
AvroIO.ReadAll<T>
Implementation of
readAll(java.lang.Class<T>) . |
static interface |
AvroIO.RecordFormatter<ElementT>
Formats an element of a user type into a record with the given schema.
|
static class |
AvroIO.Sink<ElementT>
|
static class |
AvroIO.TypedWrite<UserT,DestinationT,OutputT>
Implementation of
write(java.lang.Class<T>) . |
static class |
AvroIO.Write<T>
This class is used as the default return value of
write(java.lang.Class<T>) |
Modifier and Type | Method and Description |
---|---|
static <UserT,OutputT> |
constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy,
Schema schema,
java.util.Map<java.lang.String,java.lang.Object> metadata,
CodecFactory codec,
SerializableFunction<UserT,OutputT> formatFunction)
Returns a
DynamicAvroDestinations that always returns the same FileBasedSink.FilenamePolicy ,
schema, metadata, and codec. |
static <T> AvroIO.ParseAll<T> |
parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Like
parseGenericRecords(SerializableFunction) , but reads each filepattern in the
input PCollection . |
static <T> AvroIO.Parse<T> |
parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
Reads Avro file(s) containing records of an unspecified schema and converting each record to a
custom type.
|
static <T> AvroIO.Read<T> |
read(java.lang.Class<T> recordClass)
Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
|
static <T> AvroIO.ReadAll<T> |
readAll(java.lang.Class<T> recordClass)
Like
read(java.lang.Class<T>) , but reads each filepattern in the input PCollection . |
static AvroIO.ReadAll<GenericRecord> |
readAllGenericRecords(Schema schema)
Like
readGenericRecords(Schema) , but reads each filepattern in the input PCollection . |
static AvroIO.ReadAll<GenericRecord> |
readAllGenericRecords(java.lang.String schema)
Like
readGenericRecords(String) , but reads each filepattern in the input PCollection . |
static AvroIO.Read<GenericRecord> |
readGenericRecords(Schema schema)
Reads Avro file(s) containing records of the specified schema.
|
static AvroIO.Read<GenericRecord> |
readGenericRecords(java.lang.String schema)
Reads Avro file(s) containing records of the specified schema.
|
static <ElementT> AvroIO.Sink<ElementT> |
sink(java.lang.Class<ElementT> clazz)
A
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic() , writing
elements of the given generated class, like write(Class) . |
static <ElementT> AvroIO.Sink<ElementT> |
sinkViaGenericRecords(Schema schema,
AvroIO.RecordFormatter<ElementT> formatter)
A
AvroIO.Sink for use with FileIO.write() and FileIO.writeDynamic() , writing
elements by converting each one to a GenericRecord with a given (common) schema, like
writeCustomTypeToGenericRecords() . |
static <T> AvroIO.Write<T> |
write(java.lang.Class<T> recordClass)
Writes a
PCollection to an Avro file (or multiple Avro files matching a sharding
pattern). |
static <UserT,OutputT> |
writeCustomType()
A
PTransform that writes a PCollection to an avro file (or multiple avro files
matching a sharding pattern), with each element of the input collection encoded into its own
record of type OutputT. |
static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> |
writeCustomTypeToGenericRecords()
Similar to
writeCustomType() , but specialized for the case where the output type is
GenericRecord . |
static AvroIO.Write<GenericRecord> |
writeGenericRecords(Schema schema)
Writes Avro records of the specified schema.
|
static AvroIO.Write<GenericRecord> |
writeGenericRecords(java.lang.String schema)
Writes Avro records of the specified schema.
|
public static <T> AvroIO.Read<T> read(java.lang.Class<T> recordClass)
The schema must be specified using one of the withSchema
functions.
public static <T> AvroIO.ReadAll<T> readAll(java.lang.Class<T> recordClass)
read(java.lang.Class<T>)
, but reads each filepattern in the input PCollection
.public static AvroIO.Read<GenericRecord> readGenericRecords(Schema schema)
public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(Schema schema)
readGenericRecords(Schema)
, but reads each filepattern in the input PCollection
.public static AvroIO.Read<GenericRecord> readGenericRecords(java.lang.String schema)
public static AvroIO.ReadAll<GenericRecord> readAllGenericRecords(java.lang.String schema)
readGenericRecords(String)
, but reads each filepattern in the input PCollection
.public static <T> AvroIO.Parse<T> parseGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
public static <T> AvroIO.ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord,T> parseFn)
parseGenericRecords(SerializableFunction)
, but reads each filepattern in the
input PCollection
.public static <T> AvroIO.Write<T> write(java.lang.Class<T> recordClass)
PCollection
to an Avro file (or multiple Avro files matching a sharding
pattern).public static AvroIO.Write<GenericRecord> writeGenericRecords(Schema schema)
public static <UserT,OutputT> AvroIO.TypedWrite<UserT,java.lang.Void,OutputT> writeCustomType()
PTransform
that writes a PCollection
to an avro file (or multiple avro files
matching a sharding pattern), with each element of the input collection encoded into its own
record of type OutputT.
This version allows you to apply AvroIO
writes to a PCollection of a custom type
UserT
. A format mechanism that converts the input type UserT
to the output type
that will be written to the file must be specified. If using a custom DynamicAvroDestinations
object this is done using FileBasedSink.DynamicDestinations.formatRecord(UserT)
, otherwise the AvroIO.TypedWrite.withFormatFunction(org.apache.beam.sdk.transforms.SerializableFunction<UserT, OutputT>)
can be used to specify a format function.
The advantage of using a custom type is that is it allows a user-provided DynamicAvroDestinations
object, set via AvroIO.Write.to(DynamicAvroDestinations)
to
examine the custom type when choosing a destination.
If the output type is GenericRecord
use writeCustomTypeToGenericRecords()
instead.
public static <UserT> AvroIO.TypedWrite<UserT,java.lang.Void,GenericRecord> writeCustomTypeToGenericRecords()
writeCustomType()
, but specialized for the case where the output type is
GenericRecord
. A schema must be specified either in DynamicAvroDestinations.getSchema(DestinationT)
or if not using dynamic destinations, by using AvroIO.TypedWrite.withSchema(Schema)
.public static AvroIO.Write<GenericRecord> writeGenericRecords(java.lang.String schema)
public static <UserT,OutputT> DynamicAvroDestinations<UserT,java.lang.Void,OutputT> constantDestinations(FileBasedSink.FilenamePolicy filenamePolicy, Schema schema, java.util.Map<java.lang.String,java.lang.Object> metadata, CodecFactory codec, SerializableFunction<UserT,OutputT> formatFunction)
DynamicAvroDestinations
that always returns the same FileBasedSink.FilenamePolicy
,
schema, metadata, and codec.public static <ElementT> AvroIO.Sink<ElementT> sink(java.lang.Class<ElementT> clazz)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements of the given generated class, like write(Class)
.public static <ElementT> AvroIO.Sink<ElementT> sinkViaGenericRecords(Schema schema, AvroIO.RecordFormatter<ElementT> formatter)
AvroIO.Sink
for use with FileIO.write()
and FileIO.writeDynamic()
, writing
elements by converting each one to a GenericRecord
with a given (common) schema, like
writeCustomTypeToGenericRecords()
.