@Experimental(value=SOURCE_SINK) public abstract class WriteFiles<UserT,DestinationT,OutputT> extends PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
PTransform
that writes to a FileBasedSink
. A write begins with a sequential
global initialization of a sink, followed by a parallel write, and ends with a sequential
finalization of the write. The output of a write is PDone
.
By default, every bundle in the input PCollection
will be processed by a FileBasedSink.WriteOperation
, so the number of output will vary based on runner behavior, though at least 1
output will always be produced. The exact parallelism of the write stage can be controlled using
withNumShards(int)
, typically used to control how many files are produced or to
globally limit the number of workers connecting to an external service. However, this option can
often hurt performance: it adds an additional GroupByKey
to the pipeline.
Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));
Modifier and Type | Field and Description |
---|---|
static java.lang.Class<? extends WriteFiles> |
CONCRETE_CLASS
For internal use by runners.
|
name, resourceHints
Constructor and Description |
---|
WriteFiles() |
Modifier and Type | Method and Description |
---|---|
WriteFilesResult<DestinationT> |
expand(PCollection<UserT> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
java.util.Map<TupleTag<?>,PValue> |
getAdditionalInputs()
Returns all
PValues that are consumed as inputs to this PTransform that
are independent of the expansion of the InputT within PTransform.expand(PInput) . |
abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> |
getComputeNumShards() |
abstract @Nullable ValueProvider<java.lang.Integer> |
getNumShardsProvider() |
abstract @Nullable ShardingFunction<UserT,DestinationT> |
getShardingFunction() |
abstract FileBasedSink<UserT,DestinationT,OutputT> |
getSink() |
abstract boolean |
getWindowedWrites() |
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
static <UserT,DestinationT,OutputT> |
to(FileBasedSink<UserT,DestinationT,OutputT> sink)
Creates a
WriteFiles transform that writes to the given FileBasedSink , letting
the runner control how many different shards are produced. |
void |
validate(PipelineOptions options)
Called before running the Pipeline to verify this transform is fully and correctly specified.
|
WriteFiles<UserT,DestinationT,OutputT> |
withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
Set the maximum number of writers created in a bundle before spilling to shuffle.
|
WriteFiles<UserT,DestinationT,OutputT> |
withNoSpilling()
Returns a new
WriteFiles that writes all data without spilling, simplifying the
pipeline. |
WriteFiles<UserT,DestinationT,OutputT> |
withNumShards(int numShards)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
ValueProvider specified number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withRunnerDeterminedSharding()
Returns a new
WriteFiles that will write to the current FileBasedSink with
runner-determined sharding. |
WriteFiles<UserT,DestinationT,OutputT> |
withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified PTransform to compute the number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified sharding function to assign shard for inputs. |
WriteFiles<UserT,DestinationT,OutputT> |
withSideInputs(java.util.List<PCollectionView<?>> sideInputs) |
WriteFiles<UserT,DestinationT,OutputT> |
withSkipIfEmpty() |
WriteFiles<UserT,DestinationT,OutputT> |
withSkipIfEmpty(boolean skipIfEmpty)
Set this sink to skip writing any files if the PCollection is empty.
|
WriteFiles<UserT,DestinationT,OutputT> |
withWindowedWrites()
Returns a new
WriteFiles that writes preserves windowing on it's input. |
compose, compose, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setResourceHints, toString, validate
@Internal public static final java.lang.Class<? extends WriteFiles> CONCRETE_CLASS
public static <UserT,DestinationT,OutputT> WriteFiles<UserT,DestinationT,OutputT> to(FileBasedSink<UserT,DestinationT,OutputT> sink)
WriteFiles
transform that writes to the given FileBasedSink
, letting
the runner control how many different shards are produced.public abstract FileBasedSink<UserT,DestinationT,OutputT> getSink()
public abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> getComputeNumShards()
public abstract @Nullable ValueProvider<java.lang.Integer> getNumShardsProvider()
public abstract boolean getWindowedWrites()
public abstract @Nullable ShardingFunction<UserT,DestinationT> getShardingFunction()
public java.util.Map<TupleTag<?>,PValue> getAdditionalInputs()
PTransform
PValues
that are consumed as inputs to this PTransform
that
are independent of the expansion of the InputT
within PTransform.expand(PInput)
.
For example, this can contain any side input consumed by this PTransform
.
getAdditionalInputs
in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
public WriteFiles<UserT,DestinationT,OutputT> withNumShards(int numShards)
WriteFiles
that will write to the current FileBasedSink
using the
specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
public WriteFiles<UserT,DestinationT,OutputT> withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
WriteFiles
that will write to the current FileBasedSink
using the
ValueProvider
specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
public WriteFiles<UserT,DestinationT,OutputT> withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
public WriteFiles<UserT,DestinationT,OutputT> withSkipIfEmpty(boolean skipIfEmpty)
public WriteFiles<UserT,DestinationT,OutputT> withSideInputs(java.util.List<PCollectionView<?>> sideInputs)
public WriteFiles<UserT,DestinationT,OutputT> withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
WriteFiles
that will write to the current FileBasedSink
using the
specified PTransform
to compute the number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles
for
more information.
public WriteFiles<UserT,DestinationT,OutputT> withRunnerDeterminedSharding()
WriteFiles
that will write to the current FileBasedSink
with
runner-determined sharding.public WriteFiles<UserT,DestinationT,OutputT> withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
WriteFiles
that will write to the current FileBasedSink
using the
specified sharding function to assign shard for inputs.public WriteFiles<UserT,DestinationT,OutputT> withWindowedWrites()
WriteFiles
that writes preserves windowing on it's input.
If this option is not specified, windowing and triggering are replaced by GlobalWindows
and DefaultTrigger
.
If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if withNumShards(int)
is also set to a positive value.
public WriteFiles<UserT,DestinationT,OutputT> withNoSpilling()
WriteFiles
that writes all data without spilling, simplifying the
pipeline. This option should not be used with withMaxNumWritersPerBundle(int)
and it
will eliminate this limit possibly causing many writers to be opened. Use with caution.
This option only applies to writes withRunnerDeterminedSharding()
.
public WriteFiles<UserT,DestinationT,OutputT> withSkipIfEmpty()
public void validate(PipelineOptions options)
PTransform
By default, does nothing.
validate
in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
public void populateDisplayData(DisplayData.Builder builder)
PTransform
populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData)
. Implementations may call super.populateDisplayData(builder)
in order to register display data in the current namespace,
but should otherwise use subcomponent.populateDisplayData(builder)
to use the namespace
of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData
in interface HasDisplayData
populateDisplayData
in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
builder
- The builder to populate with display data.HasDisplayData