public abstract static class ElasticsearchIO.BulkIO extends PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>
PTransform
writing Bulk API entities created by ElasticsearchIO.DocToBulk
to
an Elasticsearch cluster. Typically, using ElasticsearchIO.Write
is preferred, whereas
using ElasticsearchIO.DocToBulk
and BulkIO separately is for advanced use cases such as
mirroring data to multiple clusters or data lakes without recomputation.name, resourceHints
Constructor and Description |
---|
BulkIO() |
Modifier and Type | Method and Description |
---|---|
PCollectionTuple |
expand(PCollection<ElasticsearchIO.Document> input)
Override this method to specify how this
PTransform should be expanded on the given
InputT . |
ElasticsearchIO.BulkIO |
withAllowableResponseErrors(@Nullable java.util.Set<java.lang.String> allowableResponseErrorTypes)
Provide a set of textual error types which can be contained in Bulk API response
items[].error.type field.
|
ElasticsearchIO.BulkIO |
withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration)
Provide the Elasticsearch connection configuration object.
|
ElasticsearchIO.BulkIO |
withIgnoreVersionConflicts(boolean ignoreVersionConflicts)
Whether or not to suppress version conflict errors in a Bulk API response.
|
ElasticsearchIO.BulkIO |
withMaxBatchSize(long batchSize)
Provide a maximum size in number of documents for the batch see bulk API
(https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html).
|
ElasticsearchIO.BulkIO |
withMaxBatchSizeBytes(long batchSizeBytes)
Provide a maximum size in bytes for the batch see bulk API
(https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html).
|
ElasticsearchIO.BulkIO |
withMaxBufferingDuration(Duration maxBufferingDuration)
If using
withUseStatefulBatches(boolean) , this can be used to set a maximum elapsed
time before buffered elements are emitted to Elasticsearch as a Bulk API request. |
ElasticsearchIO.BulkIO |
withMaxParallelRequests(int maxParallelRequests)
When using
withUseStatefulBatches(boolean) Stateful Processing, states and therefore
batches are maintained per-key-per-window. |
ElasticsearchIO.BulkIO |
withMaxParallelRequestsPerWindow(int maxParallelRequests)
Deprecated.
use
withMaxParallelRequests(int) instead. |
ElasticsearchIO.BulkIO |
withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration)
Provides configuration to retry a failed batch call to Elasticsearch.
|
ElasticsearchIO.BulkIO |
withThrowWriteErrors(boolean throwWriteErrors)
Whether to throw runtime exceptions when write (IO) errors occur.
|
ElasticsearchIO.BulkIO |
withUseStatefulBatches(boolean useStatefulBatches)
Whether or not to use Stateful Processing to ensure bulk requests have the desired number of
entities i.e.
|
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate
public ElasticsearchIO.BulkIO withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration)
connectionConfiguration
- the Elasticsearch ElasticsearchIO.ConnectionConfiguration
objectElasticsearchIO.BulkIO
with connection configuration setpublic ElasticsearchIO.BulkIO withMaxBatchSize(long batchSize)
batchSize
- maximum batch size in number of documentsElasticsearchIO.BulkIO
with connection batch size setpublic ElasticsearchIO.BulkIO withMaxBatchSizeBytes(long batchSizeBytes)
batchSizeBytes
- maximum batch size in bytesElasticsearchIO.BulkIO
with connection batch size in bytes setpublic ElasticsearchIO.BulkIO withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration)
RestClient
surfaces 429 HTTP status code as error for one
or more of the items in the Response
. Users should consider that retrying might
compound the underlying problem which caused the initial failure. Users should also be aware
that once retrying is exhausted the error is surfaced to the runner which may then
opt to retry the current bundle in entirety or abort if the max number of retries of the
runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of
5 seconds and then surfacing the error once the maximum number of retries or maximum
configuration duration is exceeded.
Example use:
ElasticsearchIO.write()
.withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, Duration.standardMinutes(3))
...
retryConfiguration
- the rules which govern the retry behaviorElasticsearchIO.BulkIO
with retrying configuredpublic ElasticsearchIO.BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts)
ignoreVersionConflicts
- true to suppress version conflicts, false to surface version
conflict errors.ElasticsearchIO.BulkIO
with version conflict handling configuredpublic ElasticsearchIO.BulkIO withAllowableResponseErrors(@Nullable java.util.Set<java.lang.String> allowableResponseErrorTypes)
See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
allowableResponseErrorTypes
- ElasticsearchIO.BulkIO
with allowable response errors setpublic ElasticsearchIO.BulkIO withMaxBufferingDuration(Duration maxBufferingDuration)
withUseStatefulBatches(boolean)
, this can be used to set a maximum elapsed
time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this
config is not set, Bulk requests will not be issued until getMaxBatchSize()
number of documents have been buffered. This may result in higher latency in particular if
your max batch size is set to a large value and your pipeline input is low volume.maxBufferingDuration
- the maximum duration to wait before sending any buffered
documents to Elasticsearch, regardless of maxBatchSize.ElasticsearchIO.BulkIO
with maximum buffering duration setpublic ElasticsearchIO.BulkIO withUseStatefulBatches(boolean useStatefulBatches)
useStatefulBatches
- true enables the use of Stateful Processing to ensure that batches
are as close to the maxBatchSize as possible.ElasticsearchIO.BulkIO
with Stateful Processing enabled or disabled@Deprecated public ElasticsearchIO.BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests)
withMaxParallelRequests(int)
instead.withUseStatefulBatches(boolean)
Stateful Processing, states and therefore
batches are maintained per-key-per-window. BE AWARE that low values for @param
maxParallelRequests, in particular if the input data has a finite number of windows, can
reduce parallelism greatly. Because data will be temporarily globally windowed as part of
writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only
ever be 1 request in flight. Having only a single request in flight can be beneficial for
ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work
for all use cases. If this number is less than the number of maximum workers in your
pipeline, the IO work will result in a sub-optimal distribution of the write step with most
runners.maxParallelRequests
- the maximum number of parallel bulk requests for a window of dataElasticsearchIO.BulkIO
with maximum parallel bulk requests per window setpublic ElasticsearchIO.BulkIO withMaxParallelRequests(int maxParallelRequests)
withUseStatefulBatches(boolean)
Stateful Processing, states and therefore
batches are maintained per-key-per-window. BE AWARE that low values for @param
maxParallelRequests, in particular if the input data has a finite number of windows, can
reduce parallelism greatly. Because data will be temporarily globally windowed as part of
writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only
ever be 1 request in flight. Having only a single request in flight can be beneficial for
ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work
for all use cases. If this number is less than the number of maximum workers in your
pipeline, the IO work will result in a sub-optimal distribution of the write step with most
runners.maxParallelRequests
- the maximum number of parallel bulk requestsElasticsearchIO.BulkIO
with maximum parallel bulk requestspublic ElasticsearchIO.BulkIO withThrowWriteErrors(boolean throwWriteErrors)
ElasticsearchIO.createWriteReport(org.apache.http.HttpEntity, java.util.Set<java.lang.String>, boolean)
. If false, a PCollectionTuple
will be returned
with tags ElasticsearchIO.Write.SUCCESSFUL_WRITES
and ElasticsearchIO.Write.FAILED_WRITES
, each being a
PCollection
of ElasticsearchIO.Document
representing documents which were written to
Elasticsearch without errors and those which failed to write due to errors, respectively.throwWriteErrors
- whether to surface write errors as runtime exceptions or return them
in a PCollection
ElasticsearchIO.BulkIO
with write error treatment configuredpublic PCollectionTuple expand(PCollection<ElasticsearchIO.Document> input)
PTransform
PTransform
should be expanded on the given
InputT
.
NOTE: This method should not be called directly. Instead apply the PTransform
should
be applied to the InputT
using the apply
method.
Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
expand
in class PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>