@Experimental(value=SOURCE_SINK) public class WriteFiles<T> extends PTransform<PCollection<T>,PDone>
PTransform
that writes to a FileBasedSink
. A write begins with a sequential
global initialization of a sink, followed by a parallel write, and ends with a sequential
finalization of the write. The output of a write is PDone
.
By default, every bundle in the input PCollection
will be processed by a
FileBasedSink.WriteOperation
, so the number of output
will vary based on runner behavior, though at least 1 output will always be produced. The
exact parallelism of the write stage can be controlled using withNumShards(int)
,
typically used to control how many files are produced or to globally limit the number of
workers connecting to an external service. However, this option can often hurt performance: it
adds an additional GroupByKey
to the pipeline.
Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));
name
Modifier and Type | Method and Description |
---|---|
PDone |
expand(PCollection<T> input)
Override this method to specify how this
PTransform should be expanded
on the given InputT . |
ValueProvider<java.lang.Integer> |
getNumShards() |
PTransform<PCollection<T>,PCollectionView<java.lang.Integer>> |
getSharding()
Gets the
PTransform that will be used to determine sharding. |
FileBasedSink<T> |
getSink()
Returns the
FileBasedSink associated with this PTransform. |
boolean |
isWindowedWrites()
Returns whether or not to perform windowed writes.
|
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
static <T> WriteFiles<T> |
to(FileBasedSink<T> sink)
Creates a
WriteFiles transform that writes to the given FileBasedSink , letting
the runner control how many different shards are produced. |
void |
validate(PipelineOptions options)
Called before running the Pipeline to verify this transform is fully and correctly
specified.
|
WriteFiles<T> |
withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
Set the maximum number of writers created in a bundle before spilling to shuffle.
|
WriteFiles<T> |
withNumShards(int numShards)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified number of shards. |
WriteFiles<T> |
withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
ValueProvider specified number of shards. |
WriteFiles<T> |
withRunnerDeterminedSharding()
Returns a new
WriteFiles that will write to the current FileBasedSink with
runner-determined sharding. |
WriteFiles<T> |
withSharding(PTransform<PCollection<T>,PCollectionView<java.lang.Integer>> sharding)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified PTransform to compute the number of shards. |
WriteFiles<T> |
withWindowedWrites()
Returns a new
WriteFiles that writes preserves windowing on it's input. |
getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, toString
public static <T> WriteFiles<T> to(FileBasedSink<T> sink)
WriteFiles
transform that writes to the given FileBasedSink
, letting
the runner control how many different shards are produced.public PDone expand(PCollection<T> input)
PTransform
PTransform
should be expanded
on the given InputT
.
NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<T>,PDone>
public void validate(PipelineOptions options)
PTransform
By default, does nothing.
validate
in class PTransform<PCollection<T>,PDone>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call
super.populateDisplayData(builder)
in order to register display data in the current
namespace, but should otherwise use subcomponent.populateDisplayData(builder)
to use
the namespace of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PCollection<T>,PDone>
builder
- The builder to populate with display data.HasDisplayData
public FileBasedSink<T> getSink()
FileBasedSink
associated with this PTransform.public boolean isWindowedWrites()
@Nullable public PTransform<PCollection<T>,PCollectionView<java.lang.Integer>> getSharding()
PTransform
that will be used to determine sharding. This can be either a
static number of shards (as following a call to withNumShards(int)
), dynamic (by
withSharding(PTransform)
), or runner-determined (by withRunnerDeterminedSharding()
.public ValueProvider<java.lang.Integer> getNumShards()
public WriteFiles<T> withNumShards(int numShards)
WriteFiles
that will write to the current FileBasedSink
using the
specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
public WriteFiles<T> withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
WriteFiles
that will write to the current FileBasedSink
using the
ValueProvider
specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
public WriteFiles<T> withSharding(PTransform<PCollection<T>,PCollectionView<java.lang.Integer>> sharding)
WriteFiles
that will write to the current FileBasedSink
using the
specified PTransform
to compute the number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
public WriteFiles<T> withRunnerDeterminedSharding()
WriteFiles
that will write to the current FileBasedSink
with
runner-determined sharding.public WriteFiles<T> withWindowedWrites()
WriteFiles
that writes preserves windowing on it's input.
If this option is not specified, windowing and triggering are replaced by
GlobalWindows
and DefaultTrigger
.
If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if withNumShards(int)
is also set to a
positive value.