@Experimental(value=SOURCE_SINK) public abstract class WriteFiles<UserT,DestinationT,OutputT> extends PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
PTransform that writes to a FileBasedSink. A write begins with a sequential
global initialization of a sink, followed by a parallel write, and ends with a sequential
finalization of the write. The output of a write is PDone.
By default, every bundle in the input PCollection will be processed by a FileBasedSink.WriteOperation, so the number of output will vary based on runner behavior, though at least 1
output will always be produced. The exact parallelism of the write stage can be controlled using
withNumShards(int), typically used to control how many files are produced or to
globally limit the number of workers connecting to an external service. However, this option can
often hurt performance: it adds an additional GroupByKey to the pipeline.
Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));| Modifier and Type | Field and Description |
|---|---|
static java.lang.Class<? extends WriteFiles> |
CONCRETE_CLASS
For internal use by runners.
|
name| Constructor and Description |
|---|
WriteFiles() |
| Modifier and Type | Method and Description |
|---|---|
WriteFilesResult<DestinationT> |
expand(PCollection<UserT> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT. |
java.util.Map<TupleTag<?>,PValue> |
getAdditionalInputs()
Returns all
PValues that are consumed as inputs to this PTransform that
are independent of the expansion of the InputT within PTransform.expand(PInput). |
abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> |
getComputeNumShards() |
abstract @Nullable ValueProvider<java.lang.Integer> |
getNumShardsProvider() |
abstract @Nullable ShardingFunction<UserT,DestinationT> |
getShardingFunction() |
abstract FileBasedSink<UserT,DestinationT,OutputT> |
getSink() |
abstract boolean |
getWindowedWrites() |
void |
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.
|
static <UserT,DestinationT,OutputT> |
to(FileBasedSink<UserT,DestinationT,OutputT> sink)
Creates a
WriteFiles transform that writes to the given FileBasedSink, letting
the runner control how many different shards are produced. |
void |
validate(PipelineOptions options)
Called before running the Pipeline to verify this transform is fully and correctly specified.
|
WriteFiles<UserT,DestinationT,OutputT> |
withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
Set the maximum number of writers created in a bundle before spilling to shuffle.
|
WriteFiles<UserT,DestinationT,OutputT> |
withNoSpilling()
Returns a new
WriteFiles that writes all data without spilling, simplifying the
pipeline. |
WriteFiles<UserT,DestinationT,OutputT> |
withNumShards(int numShards)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
ValueProvider specified number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withRunnerDeterminedSharding()
Returns a new
WriteFiles that will write to the current FileBasedSink with
runner-determined sharding. |
WriteFiles<UserT,DestinationT,OutputT> |
withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified PTransform to compute the number of shards. |
WriteFiles<UserT,DestinationT,OutputT> |
withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
Returns a new
WriteFiles that will write to the current FileBasedSink using the
specified sharding function to assign shard for inputs. |
WriteFiles<UserT,DestinationT,OutputT> |
withSideInputs(java.util.List<PCollectionView<?>> sideInputs) |
WriteFiles<UserT,DestinationT,OutputT> |
withWindowedWrites()
Returns a new
WriteFiles that writes preserves windowing on it's input. |
compose, compose, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, toString@Internal public static final java.lang.Class<? extends WriteFiles> CONCRETE_CLASS
public static <UserT,DestinationT,OutputT> WriteFiles<UserT,DestinationT,OutputT> to(FileBasedSink<UserT,DestinationT,OutputT> sink)
WriteFiles transform that writes to the given FileBasedSink, letting
the runner control how many different shards are produced.public abstract FileBasedSink<UserT,DestinationT,OutputT> getSink()
public abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> getComputeNumShards()
public abstract @Nullable ValueProvider<java.lang.Integer> getNumShardsProvider()
public abstract boolean getWindowedWrites()
public abstract @Nullable ShardingFunction<UserT,DestinationT> getShardingFunction()
public java.util.Map<TupleTag<?>,PValue> getAdditionalInputs()
PTransformPValues that are consumed as inputs to this PTransform that
are independent of the expansion of the InputT within PTransform.expand(PInput).
For example, this can contain any side input consumed by this PTransform.
getAdditionalInputs in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>public WriteFiles<UserT,DestinationT,OutputT> withNumShards(int numShards)
WriteFiles that will write to the current FileBasedSink using the
specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles for
more information.
A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
public WriteFiles<UserT,DestinationT,OutputT> withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
WriteFiles that will write to the current FileBasedSink using the
ValueProvider specified number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles for
more information.
public WriteFiles<UserT,DestinationT,OutputT> withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
public WriteFiles<UserT,DestinationT,OutputT> withSideInputs(java.util.List<PCollectionView<?>> sideInputs)
public WriteFiles<UserT,DestinationT,OutputT> withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
WriteFiles that will write to the current FileBasedSink using the
specified PTransform to compute the number of shards.
This option should be used sparingly as it can hurt performance. See WriteFiles for
more information.
public WriteFiles<UserT,DestinationT,OutputT> withRunnerDeterminedSharding()
WriteFiles that will write to the current FileBasedSink with
runner-determined sharding.public WriteFiles<UserT,DestinationT,OutputT> withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
WriteFiles that will write to the current FileBasedSink using the
specified sharding function to assign shard for inputs.public WriteFiles<UserT,DestinationT,OutputT> withWindowedWrites()
WriteFiles that writes preserves windowing on it's input.
If this option is not specified, windowing and triggering are replaced by GlobalWindows and DefaultTrigger.
If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if withNumShards(int) is also set to a positive value.
public WriteFiles<UserT,DestinationT,OutputT> withNoSpilling()
WriteFiles that writes all data without spilling, simplifying the
pipeline. This option should not be used with withMaxNumWritersPerBundle(int) and it
will eliminate this limit possibly causing many writers to be opened. Use with caution.
This option only applies to writes withRunnerDeterminedSharding().
public void validate(PipelineOptions options)
PTransformBy default, does nothing.
validate in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>public WriteFilesResult<DestinationT> expand(PCollection<UserT> input)
PTransformPTransform should be expanded on the given
InputT.
NOTE: This method should not be called directly. Instead apply the PTransform should
be applied to the InputT using the apply method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>public void populateDisplayData(DisplayData.Builder builder)
PTransformpopulateDisplayData(DisplayData.Builder) is invoked by Pipeline runners to collect
display data via DisplayData.from(HasDisplayData). Implementations may call super.populateDisplayData(builder) in order to register display data in the current namespace,
but should otherwise use subcomponent.populateDisplayData(builder) to use the namespace
of the subcomponent.
By default, does not register any display data. Implementors may override this method to provide their own display data.
populateDisplayData in interface HasDisplayDatapopulateDisplayData in class PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>builder - The builder to populate with display data.HasDisplayData