public abstract static class ElasticsearchIO.BulkIO extends PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>
PTransform
writing Bulk API entities created by ElasticsearchIO.DocToBulk
to
an Elasticsearch cluster. Typically, using ElasticsearchIO.Write
is preferred, whereas
using ElasticsearchIO.DocToBulk
and BulkIO separately is for advanced use cases such as
mirroring data to multiple clusters or data lakes without recomputation.name, resourceHints
Constructor and Description |
---|
BulkIO() |
Modifier and Type | Method and Description |
---|---|
PCollectionTuple |
expand(PCollection<ElasticsearchIO.Document> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
ElasticsearchIO.BulkIO |
withAllowableResponseErrors(@Nullable java.util.Set<java.lang.String> allowableResponseErrorTypes)
Provide a set of textual error types which can be contained in Bulk API response
items[].error.type field.
|
ElasticsearchIO.BulkIO |
withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration)
Provide the Elasticsearch connection configuration object.
|
ElasticsearchIO.BulkIO |
withIgnoreVersionConflicts(boolean ignoreVersionConflicts)
Whether or not to suppress version conflict errors in a Bulk API response.
|
ElasticsearchIO.BulkIO |
withMaxBatchSize(long batchSize)
Provide a maximum size in number of documents for the batch see bulk API
(https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html).
|
ElasticsearchIO.BulkIO |
withMaxBatchSizeBytes(long batchSizeBytes)
Provide a maximum size in bytes for the batch see bulk API
(https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html).
|
ElasticsearchIO.BulkIO |
withMaxBufferingDuration(Duration maxBufferingDuration)
If using
withUseStatefulBatches(boolean) , this can be used to set a maximum elapsed
time before buffered elements are emitted to Elasticsearch as a Bulk API request. |
ElasticsearchIO.BulkIO |
withMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow)
When using
withUseStatefulBatches(boolean) Stateful Processing, states and therefore
batches are maintained per-key-per-window. |
ElasticsearchIO.BulkIO |
withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration)
Provides configuration to retry a failed batch call to Elasticsearch.
|
ElasticsearchIO.BulkIO |
withThrowWriteErrors(boolean throwWriteErrors)
Whether to throw runtime exceptions when write (IO) errors occur.
|
ElasticsearchIO.BulkIO |
withUseStatefulBatches(boolean useStatefulBatches)
Whether or not to use Stateful Processing to ensure bulk requests have the desired number of
entities i.e.
|
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate
public ElasticsearchIO.BulkIO withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration)
connectionConfiguration
- the Elasticsearch ElasticsearchIO.ConnectionConfiguration
objectElasticsearchIO.BulkIO
with connection configuration setpublic ElasticsearchIO.BulkIO withMaxBatchSize(long batchSize)
batchSize
- maximum batch size in number of documentsElasticsearchIO.BulkIO
with connection batch size setpublic ElasticsearchIO.BulkIO withMaxBatchSizeBytes(long batchSizeBytes)
batchSizeBytes
- maximum batch size in bytesElasticsearchIO.BulkIO
with connection batch size in bytes setpublic ElasticsearchIO.BulkIO withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration)
RestClient
surfaces 429 HTTP status code as error for one
or more of the items in the Response
. Users should consider that retrying might
compound the underlying problem which caused the initial failure. Users should also be aware
that once retrying is exhausted the error is surfaced to the runner which may then
opt to retry the current bundle in entirety or abort if the max number of retries of the
runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of
5 seconds and then surfacing the error once the maximum number of retries or maximum
configuration duration is exceeded.
Example use:
ElasticsearchIO.write()
.withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, Duration.standardMinutes(3))
...
retryConfiguration
- the rules which govern the retry behaviorElasticsearchIO.BulkIO
with retrying configuredpublic ElasticsearchIO.BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts)
ignoreVersionConflicts
- true to suppress version conflicts, false to surface version
conflict errors.ElasticsearchIO.BulkIO
with version conflict handling configuredpublic ElasticsearchIO.BulkIO withAllowableResponseErrors(@Nullable java.util.Set<java.lang.String> allowableResponseErrorTypes)
See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
allowableResponseErrorTypes
- ElasticsearchIO.BulkIO
with allowable response errors setpublic ElasticsearchIO.BulkIO withMaxBufferingDuration(Duration maxBufferingDuration)
withUseStatefulBatches(boolean)
, this can be used to set a maximum elapsed
time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this
config is not set, Bulk requests will not be issued until getMaxBatchSize()
number of documents have been buffered. This may result in higher latency in particular if
your max batch size is set to a large value and your pipeline input is low volume.maxBufferingDuration
- the maximum duration to wait before sending any buffered
documents to Elasticsearch, regardless of maxBatchSize.ElasticsearchIO.BulkIO
with maximum buffering duration setpublic ElasticsearchIO.BulkIO withUseStatefulBatches(boolean useStatefulBatches)
useStatefulBatches
- true enables the use of Stateful Processing to ensure that batches
are as close to the maxBatchSize as possible.ElasticsearchIO.BulkIO
with Stateful Processing enabled or disabledpublic ElasticsearchIO.BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequestsPerWindow)
withUseStatefulBatches(boolean)
Stateful Processing, states and therefore
batches are maintained per-key-per-window. BE AWARE that low values for @param
maxParallelRequestsPerWindow, in particular if the input data has a finite number of windows,
can reduce parallelism greatly. If data is globally windowed and @param
maxParallelRequestsPerWindow is set to 1,there will only ever be 1 request in flight. Having
only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is
not overwhelmed by parallel requests,but may not work for all use cases. If this number is
less than the number of maximum workers in your pipeline, the IO work will result in a
sub-distribution of the last write step with most of the runners.maxParallelRequestsPerWindow
- the maximum number of parallel bulk requests for a window
of dataElasticsearchIO.BulkIO
with maximum parallel bulk requests per window setpublic ElasticsearchIO.BulkIO withThrowWriteErrors(boolean throwWriteErrors)
ElasticsearchIO.createWriteReport(org.apache.http.HttpEntity, java.util.Set<java.lang.String>, boolean)
. If false, a PCollectionTuple
will be returned
with tags ElasticsearchIO.Write.SUCCESSFUL_WRITES
and ElasticsearchIO.Write.FAILED_WRITES
, each being a
PCollection
of ElasticsearchIO.Document
representing documents which were written to
Elasticsearch without errors and those which failed to write due to errors, respectively.throwWriteErrors
- whether to surface write errors as runtime exceptions or return them
in a PCollection
ElasticsearchIO.BulkIO
with write error treatment configuredpublic PCollectionTuple expand(PCollection<ElasticsearchIO.Document> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>