Class WriteFiles<UserT,DestinationT,OutputT>
- All Implemented Interfaces:
Serializable,HasDisplayData
PTransform that writes to a FileBasedSink. A write begins with a sequential
global initialization of a sink, followed by a parallel write, and ends with a sequential
finalization of the write. The output of a write is PDone.
By default, every bundle in the input PCollection will be processed by a FileBasedSink.WriteOperation, so the number of output will vary based on runner behavior, though at least 1
output will always be produced. The exact parallelism of the write stage can be controlled using
withNumShards(int), typically used to control how many files are produced or to
globally limit the number of workers connecting to an external service. However, this option can
often hurt performance: it adds an additional GroupByKey to the pipeline.
Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Class<? extends WriteFiles> For internal use by runners.static final intstatic final Durationstatic final intFields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionexpand(PCollection<UserT> input) Override this method to specify how thisPTransformshould be expanded on the givenInputT.Returns allPValuesthat are consumed as inputs to thisPTransformthat are independent of the expansion of thePTransformwithinPTransform.expand(PInput).abstract ErrorHandler<BadRecord, ?> abstract BadRecordRouterabstract @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> abstract @Nullable ValueProvider<Integer> abstract @Nullable ShardingFunction<UserT, DestinationT> abstract FileBasedSink<UserT, DestinationT, OutputT> getSink()abstract booleanabstract booleanvoidpopulateDisplayData(DisplayData.Builder builder) Register display data for the given transform or component.static <UserT,DestinationT, OutputT>
WriteFiles<UserT, DestinationT, OutputT> to(FileBasedSink<UserT, DestinationT, OutputT> sink) Creates aWriteFilestransform that writes to the givenFileBasedSink, letting the runner control how many different shards are produced.voidvalidate(PipelineOptions options) Called before running the Pipeline to verify this transform is fully and correctly specified.withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) SeeFileIO.Write.withBadRecordErrorHandler(ErrorHandler)for details on usage.withBatchMaxBufferingDuration(@Nullable Duration batchMaxBufferingDuration) Returns a newWriteFilesthat will batch the input records using specified max buffering duration.withBatchSize(@Nullable Integer batchSize) Returns a newWriteFilesthat will batch the input records using specified batch size.withBatchSizeBytes(@Nullable Integer batchSizeBytes) Returns a newWriteFilesthat will batch the input records using specified batch size in bytes.withMaxNumWritersPerBundle(int maxNumWritersPerBundle) Set the maximum number of writers created in a bundle before spilling to shuffle.Returns a newWriteFilesthat writes all data without spilling, simplifying the pipeline.withNumShards(int numShards) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specified number of shards.withNumShards(ValueProvider<Integer> numShardsProvider) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing theValueProviderspecified number of shards.Returns a newWriteFilesthat will write to the currentFileBasedSinkwith runner-determined sharding.withSharding(PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specifiedPTransformto compute the number of shards.withShardingFunction(ShardingFunction<UserT, DestinationT> shardingFunction) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specified sharding function to assign shard for inputs.withSideInputs(List<PCollectionView<?>> sideInputs) withSkipIfEmpty(boolean skipIfEmpty) Set this sink to skip writing any files if the PCollection is empty.Returns a newWriteFilesthat writes preserves windowing on it's input.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setDisplayData, setResourceHints, toString, validate
-
Field Details
-
CONCRETE_CLASS
For internal use by runners. -
FILE_TRIGGERING_RECORD_COUNT
public static final int FILE_TRIGGERING_RECORD_COUNT- See Also:
-
FILE_TRIGGERING_BYTE_COUNT
public static final int FILE_TRIGGERING_BYTE_COUNT- See Also:
-
FILE_TRIGGERING_RECORD_BUFFERING_DURATION
-
-
Constructor Details
-
WriteFiles
public WriteFiles()
-
-
Method Details
-
to
public static <UserT,DestinationT, WriteFiles<UserT,OutputT> DestinationT, toOutputT> (FileBasedSink<UserT, DestinationT, OutputT> sink) Creates aWriteFilestransform that writes to the givenFileBasedSink, letting the runner control how many different shards are produced. -
getSink
-
getComputeNumShards
public abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<Integer>> getComputeNumShards() -
getNumShardsProvider
-
getWindowedWrites
public abstract boolean getWindowedWrites() -
getWithAutoSharding
public abstract boolean getWithAutoSharding() -
getShardingFunction
-
getBadRecordErrorHandler
-
getBadRecordRouter
-
getAdditionalInputs
Description copied from class:PTransformReturns allPValuesthat are consumed as inputs to thisPTransformthat are independent of the expansion of thePTransformwithinPTransform.expand(PInput).For example, this can contain any side input consumed by this
PTransform.- Overrides:
getAdditionalInputsin classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
withNumShards
Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFilesfor more information.A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
-
withNumShards
public WriteFiles<UserT,DestinationT, withNumShardsOutputT> (ValueProvider<Integer> numShardsProvider) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing theValueProviderspecified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFilesfor more information. -
withMaxNumWritersPerBundle
public WriteFiles<UserT,DestinationT, withMaxNumWritersPerBundleOutputT> (int maxNumWritersPerBundle) Set the maximum number of writers created in a bundle before spilling to shuffle. -
withSkipIfEmpty
Set this sink to skip writing any files if the PCollection is empty. -
withBatchSize
Returns a newWriteFilesthat will batch the input records using specified batch size. The default value isFILE_TRIGGERING_RECORD_COUNT.This option is used only for writing unbounded data with auto-sharding.
-
withBatchSizeBytes
Returns a newWriteFilesthat will batch the input records using specified batch size in bytes. The default value isFILE_TRIGGERING_BYTE_COUNT.This option is used only for writing unbounded data with auto-sharding.
-
withBatchMaxBufferingDuration
public WriteFiles<UserT,DestinationT, withBatchMaxBufferingDurationOutputT> (@Nullable Duration batchMaxBufferingDuration) Returns a newWriteFilesthat will batch the input records using specified max buffering duration. The default value isFILE_TRIGGERING_RECORD_BUFFERING_DURATION.This option is used only for writing unbounded data with auto-sharding.
-
withSideInputs
-
withSharding
public WriteFiles<UserT,DestinationT, withShardingOutputT> (PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specifiedPTransformto compute the number of shards.This option should be used sparingly as it can hurt performance. See
WriteFilesfor more information. -
withRunnerDeterminedSharding
Returns a newWriteFilesthat will write to the currentFileBasedSinkwith runner-determined sharding. -
withAutoSharding
-
withShardingFunction
public WriteFiles<UserT,DestinationT, withShardingFunctionOutputT> (ShardingFunction<UserT, DestinationT> shardingFunction) Returns a newWriteFilesthat will write to the currentFileBasedSinkusing the specified sharding function to assign shard for inputs. -
withWindowedWrites
Returns a newWriteFilesthat writes preserves windowing on it's input.If this option is not specified, windowing and triggering are replaced by
GlobalWindowsandDefaultTrigger.If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if
withNumShards(int)is also set to a positive value. -
withNoSpilling
Returns a newWriteFilesthat writes all data without spilling, simplifying the pipeline. This option should not be used withwithMaxNumWritersPerBundle(int)and it will eliminate this limit possibly causing many writers to be opened. Use with caution.This option only applies to writes
withRunnerDeterminedSharding(). -
withSkipIfEmpty
-
withBadRecordErrorHandler
public WriteFiles<UserT,DestinationT, withBadRecordErrorHandlerOutputT> (ErrorHandler<BadRecord, ?> errorHandler) SeeFileIO.Write.withBadRecordErrorHandler(ErrorHandler)for details on usage. -
validate
Description copied from class:PTransformCalled before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validatein classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
expand
Description copied from class:PTransformOverride this method to specify how thisPTransformshould be expanded on the givenInputT.NOTE: This method should not be called directly. Instead apply the
PTransformshould be applied to theInputTusing theapplymethod.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expandin classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
populateDisplayData
Description copied from class:PTransformRegister display data for the given transform or component.populateDisplayData(DisplayData.Builder)is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData). Implementations may callsuper.populateDisplayData(builder)in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayDatain interfaceHasDisplayData- Overrides:
populateDisplayDatain classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>> - Parameters:
builder- The builder to populate with display data.- See Also:
-