Class WriteFiles<UserT,DestinationT,OutputT>
- All Implemented Interfaces:
Serializable
,HasDisplayData
PTransform
that writes to a FileBasedSink
. A write begins with a sequential
global initialization of a sink, followed by a parallel write, and ends with a sequential
finalization of the write. The output of a write is PDone
.
By default, every bundle in the input PCollection
will be processed by a FileBasedSink.WriteOperation
, so the number of output will vary based on runner behavior, though at least 1
output will always be produced. The exact parallelism of the write stage can be controlled using
withNumShards(int)
, typically used to control how many files are produced or to
globally limit the number of workers connecting to an external service. However, this option can
often hurt performance: it adds an additional GroupByKey
to the pipeline.
Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));
- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Class
<? extends WriteFiles> For internal use by runners.static final int
static final Duration
static final int
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand
(PCollection<UserT> input) Override this method to specify how thisPTransform
should be expanded on the givenInputT
.Returns allPValues
that are consumed as inputs to thisPTransform
that are independent of the expansion of thePTransform
withinPTransform.expand(PInput)
.abstract ErrorHandler
<BadRecord, ?> abstract BadRecordRouter
abstract @Nullable PTransform
<PCollection<UserT>, PCollectionView<Integer>> abstract @Nullable ValueProvider
<Integer> abstract @Nullable ShardingFunction
<UserT, DestinationT> abstract FileBasedSink
<UserT, DestinationT, OutputT> getSink()
abstract boolean
abstract boolean
void
populateDisplayData
(DisplayData.Builder builder) Register display data for the given transform or component.static <UserT,
DestinationT, OutputT>
WriteFiles<UserT, DestinationT, OutputT> to
(FileBasedSink<UserT, DestinationT, OutputT> sink) Creates aWriteFiles
transform that writes to the givenFileBasedSink
, letting the runner control how many different shards are produced.void
validate
(PipelineOptions options) Called before running the Pipeline to verify this transform is fully and correctly specified.withBadRecordErrorHandler
(ErrorHandler<BadRecord, ?> errorHandler) SeeFileIO.Write.withBadRecordErrorHandler(ErrorHandler)
for details on usage.withBatchMaxBufferingDuration
(@Nullable Duration batchMaxBufferingDuration) Returns a newWriteFiles
that will batch the input records using specified max buffering duration.withBatchSize
(@Nullable Integer batchSize) Returns a newWriteFiles
that will batch the input records using specified batch size.withBatchSizeBytes
(@Nullable Integer batchSizeBytes) Returns a newWriteFiles
that will batch the input records using specified batch size in bytes.withMaxNumWritersPerBundle
(int maxNumWritersPerBundle) Set the maximum number of writers created in a bundle before spilling to shuffle.Returns a newWriteFiles
that writes all data without spilling, simplifying the pipeline.withNumShards
(int numShards) Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified number of shards.withNumShards
(ValueProvider<Integer> numShardsProvider) Returns a newWriteFiles
that will write to the currentFileBasedSink
using theValueProvider
specified number of shards.Returns a newWriteFiles
that will write to the currentFileBasedSink
with runner-determined sharding.withSharding
(PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specifiedPTransform
to compute the number of shards.withShardingFunction
(ShardingFunction<UserT, DestinationT> shardingFunction) Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified sharding function to assign shard for inputs.withSideInputs
(List<PCollectionView<?>> sideInputs) withSkipIfEmpty
(boolean skipIfEmpty) Set this sink to skip writing any files if the PCollection is empty.Returns a newWriteFiles
that writes preserves windowing on it's input.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate
-
Field Details
-
CONCRETE_CLASS
For internal use by runners. -
FILE_TRIGGERING_RECORD_COUNT
public static final int FILE_TRIGGERING_RECORD_COUNT- See Also:
-
FILE_TRIGGERING_BYTE_COUNT
public static final int FILE_TRIGGERING_BYTE_COUNT- See Also:
-
FILE_TRIGGERING_RECORD_BUFFERING_DURATION
-
-
Constructor Details
-
WriteFiles
public WriteFiles()
-
-
Method Details
-
to
public static <UserT,DestinationT, WriteFiles<UserT,OutputT> DestinationT, toOutputT> (FileBasedSink<UserT, DestinationT, OutputT> sink) Creates aWriteFiles
transform that writes to the givenFileBasedSink
, letting the runner control how many different shards are produced. -
getSink
-
getComputeNumShards
public abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<Integer>> getComputeNumShards() -
getNumShardsProvider
-
getWindowedWrites
public abstract boolean getWindowedWrites() -
getWithAutoSharding
public abstract boolean getWithAutoSharding() -
getShardingFunction
-
getBadRecordErrorHandler
-
getBadRecordRouter
-
getAdditionalInputs
Description copied from class:PTransform
Returns allPValues
that are consumed as inputs to thisPTransform
that are independent of the expansion of thePTransform
withinPTransform.expand(PInput)
.For example, this can contain any side input consumed by this
PTransform
.- Overrides:
getAdditionalInputs
in classPTransform<PCollection<UserT>,
WriteFilesResult<DestinationT>>
-
withNumShards
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information.A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
-
withNumShards
public WriteFiles<UserT,DestinationT, withNumShardsOutputT> (ValueProvider<Integer> numShardsProvider) Returns a newWriteFiles
that will write to the currentFileBasedSink
using theValueProvider
specified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information. -
withMaxNumWritersPerBundle
public WriteFiles<UserT,DestinationT, withMaxNumWritersPerBundleOutputT> (int maxNumWritersPerBundle) Set the maximum number of writers created in a bundle before spilling to shuffle. -
withSkipIfEmpty
Set this sink to skip writing any files if the PCollection is empty. -
withBatchSize
Returns a newWriteFiles
that will batch the input records using specified batch size. The default value isFILE_TRIGGERING_RECORD_COUNT
.This option is used only for writing unbounded data with auto-sharding.
-
withBatchSizeBytes
Returns a newWriteFiles
that will batch the input records using specified batch size in bytes. The default value isFILE_TRIGGERING_BYTE_COUNT
.This option is used only for writing unbounded data with auto-sharding.
-
withBatchMaxBufferingDuration
public WriteFiles<UserT,DestinationT, withBatchMaxBufferingDurationOutputT> (@Nullable Duration batchMaxBufferingDuration) Returns a newWriteFiles
that will batch the input records using specified max buffering duration. The default value isFILE_TRIGGERING_RECORD_BUFFERING_DURATION
.This option is used only for writing unbounded data with auto-sharding.
-
withSideInputs
-
withSharding
public WriteFiles<UserT,DestinationT, withShardingOutputT> (PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specifiedPTransform
to compute the number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information. -
withRunnerDeterminedSharding
Returns a newWriteFiles
that will write to the currentFileBasedSink
with runner-determined sharding. -
withAutoSharding
-
withShardingFunction
public WriteFiles<UserT,DestinationT, withShardingFunctionOutputT> (ShardingFunction<UserT, DestinationT> shardingFunction) Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified sharding function to assign shard for inputs. -
withWindowedWrites
Returns a newWriteFiles
that writes preserves windowing on it's input.If this option is not specified, windowing and triggering are replaced by
GlobalWindows
andDefaultTrigger
.If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if
withNumShards(int)
is also set to a positive value. -
withNoSpilling
Returns a newWriteFiles
that writes all data without spilling, simplifying the pipeline. This option should not be used withwithMaxNumWritersPerBundle(int)
and it will eliminate this limit possibly causing many writers to be opened. Use with caution.This option only applies to writes
withRunnerDeterminedSharding()
. -
withSkipIfEmpty
-
withBadRecordErrorHandler
public WriteFiles<UserT,DestinationT, withBadRecordErrorHandlerOutputT> (ErrorHandler<BadRecord, ?> errorHandler) SeeFileIO.Write.withBadRecordErrorHandler(ErrorHandler)
for details on usage. -
validate
Description copied from class:PTransform
Called before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validate
in classPTransform<PCollection<UserT>,
WriteFilesResult<DestinationT>>
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<UserT>,
WriteFilesResult<DestinationT>>
-
populateDisplayData
Description copied from class:PTransform
Register display data for the given transform or component.populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData)
. Implementations may callsuper.populateDisplayData(builder)
in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)
to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayData
in interfaceHasDisplayData
- Overrides:
populateDisplayData
in classPTransform<PCollection<UserT>,
WriteFilesResult<DestinationT>> - Parameters:
builder
- The builder to populate with display data.- See Also:
-