Class ElasticsearchIO.BulkIO
java.lang.Object
org.apache.beam.sdk.transforms.PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO
- All Implemented Interfaces:
Serializable
,HasDisplayData
- Enclosing class:
ElasticsearchIO
public abstract static class ElasticsearchIO.BulkIO
extends PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>
A
PTransform
writing Bulk API entities created by ElasticsearchIO.DocToBulk
to
an Elasticsearch cluster. Typically, using ElasticsearchIO.Write
is preferred, whereas
using ElasticsearchIO.DocToBulk
and BulkIO separately is for advanced use cases such as
mirroring data to multiple clusters or data lakes without recomputation.- See Also:
-
Field Summary
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
annotations, displayData, name, resourceHints
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionOverride this method to specify how thisPTransform
should be expanded on the givenInputT
.withAllowableResponseErrors
(@Nullable Set<String> allowableResponseErrorTypes) Provide a set of textual error types which can be contained in Bulk API response items[].error.type field.withConnectionConfiguration
(ElasticsearchIO.ConnectionConfiguration connectionConfiguration) Provide the Elasticsearch connection configuration object.withIgnoreVersionConflicts
(boolean ignoreVersionConflicts) Whether or not to suppress version conflict errors in a Bulk API response.withMaxBatchSize
(long batchSize) Provide a maximum size in number of documents for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html).withMaxBatchSizeBytes
(long batchSizeBytes) Provide a maximum size in bytes for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html).withMaxBufferingDuration
(Duration maxBufferingDuration) If usingwithUseStatefulBatches(boolean)
, this can be used to set a maximum elapsed time before buffered elements are emitted to Elasticsearch as a Bulk API request.withMaxParallelRequests
(int maxParallelRequests) When usingwithUseStatefulBatches(boolean)
Stateful Processing, states and therefore batches are maintained per-key-per-window.withMaxParallelRequestsPerWindow
(int maxParallelRequests) Deprecated.withRetryConfiguration
(ElasticsearchIO.RetryConfiguration retryConfiguration) Provides configuration to retry a failed batch call to Elasticsearch.withThrowWriteErrors
(boolean throwWriteErrors) Whether to throw runtime exceptions when write (IO) errors occur.withUseStatefulBatches
(boolean useStatefulBatches) Whether or not to use Stateful Processing to ensure bulk requests have the desired number of entities i.e.Methods inherited from class org.apache.beam.sdk.transforms.PTransform
addAnnotation, compose, compose, getAdditionalInputs, getAnnotations, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setDisplayData, setResourceHints, toString, validate, validate
-
Constructor Details
-
BulkIO
public BulkIO()
-
-
Method Details
-
withConnectionConfiguration
public ElasticsearchIO.BulkIO withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration) Provide the Elasticsearch connection configuration object.- Parameters:
connectionConfiguration
- the ElasticsearchElasticsearchIO.ConnectionConfiguration
object- Returns:
- the
ElasticsearchIO.BulkIO
with connection configuration set
-
withMaxBatchSize
Provide a maximum size in number of documents for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html). Default is 1000 docs (like Elasticsearch bulk size advice). See https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the execution engine, size of bundles may vary, this sets the maximum size. Change this if you need to have smaller ElasticSearch bulks.- Parameters:
batchSize
- maximum batch size in number of documents- Returns:
- the
ElasticsearchIO.BulkIO
with connection batch size set
-
withMaxBatchSizeBytes
Provide a maximum size in bytes for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html). Default is 5MB (like Elasticsearch bulk size advice). See https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the execution engine, size of bundles may vary, this sets the maximum size. Change this if you need to have smaller ElasticSearch bulks.- Parameters:
batchSizeBytes
- maximum batch size in bytes- Returns:
- the
ElasticsearchIO.BulkIO
with connection batch size in bytes set
-
withRetryConfiguration
public ElasticsearchIO.BulkIO withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration) Provides configuration to retry a failed batch call to Elasticsearch. A batch is considered as failed if the underlyingRestClient
surfaces 429 HTTP status code as error for one or more of the items in theResponse
. Users should consider that retrying might compound the underlying problem which caused the initial failure. Users should also be aware that once retrying is exhausted the error is surfaced to the runner which may then opt to retry the current bundle in entirety or abort if the max number of retries of the runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of 5 seconds and then surfacing the error once the maximum number of retries or maximum configuration duration is exceeded.Example use:
ElasticsearchIO.write() .withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, Duration.standardMinutes(3)) ...
- Parameters:
retryConfiguration
- the rules which govern the retry behavior- Returns:
- the
ElasticsearchIO.BulkIO
with retrying configured
-
withIgnoreVersionConflicts
Whether or not to suppress version conflict errors in a Bulk API response. This can be useful if your use case involves using external version types.- Parameters:
ignoreVersionConflicts
- true to suppress version conflicts, false to surface version conflict errors.- Returns:
- the
ElasticsearchIO.BulkIO
with version conflict handling configured
-
withAllowableResponseErrors
public ElasticsearchIO.BulkIO withAllowableResponseErrors(@Nullable Set<String> allowableResponseErrorTypes) Provide a set of textual error types which can be contained in Bulk API response items[].error.type field. Any element in @param allowableResponseErrorTypes will suppress errors of the same type in Bulk responses.See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
- Parameters:
allowableResponseErrorTypes
-- Returns:
- the
ElasticsearchIO.BulkIO
with allowable response errors set
-
withMaxBufferingDuration
If usingwithUseStatefulBatches(boolean)
, this can be used to set a maximum elapsed time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this config is not set, Bulk requests will not be issued untilgetMaxBatchSize()
number of documents have been buffered. This may result in higher latency in particular if your max batch size is set to a large value and your pipeline input is low volume.- Parameters:
maxBufferingDuration
- the maximum duration to wait before sending any buffered documents to Elasticsearch, regardless of maxBatchSize.- Returns:
- the
ElasticsearchIO.BulkIO
with maximum buffering duration set
-
withUseStatefulBatches
Whether or not to use Stateful Processing to ensure bulk requests have the desired number of entities i.e. as close to the maxBatchSize as possible. By default without this feature enabled, Bulk requests will not contain more than maxBatchSize entities, but the lower bound of batch size is determined by Beam Runner bundle sizes, which may be as few as 1.- Parameters:
useStatefulBatches
- true enables the use of Stateful Processing to ensure that batches are as close to the maxBatchSize as possible.- Returns:
- the
ElasticsearchIO.BulkIO
with Stateful Processing enabled or disabled
-
withMaxParallelRequestsPerWindow
Deprecated.usewithMaxParallelRequests(int)
instead.When usingwithUseStatefulBatches(boolean)
Stateful Processing, states and therefore batches are maintained per-key-per-window. BE AWARE that low values for @param maxParallelRequests, in particular if the input data has a finite number of windows, can reduce parallelism greatly. Because data will be temporarily globally windowed as part of writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only ever be 1 request in flight. Having only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work for all use cases. If this number is less than the number of maximum workers in your pipeline, the IO work will result in a sub-optimal distribution of the write step with most runners.- Parameters:
maxParallelRequests
- the maximum number of parallel bulk requests for a window of data- Returns:
- the
ElasticsearchIO.BulkIO
with maximum parallel bulk requests per window set
-
withMaxParallelRequests
When usingwithUseStatefulBatches(boolean)
Stateful Processing, states and therefore batches are maintained per-key-per-window. BE AWARE that low values for @param maxParallelRequests, in particular if the input data has a finite number of windows, can reduce parallelism greatly. Because data will be temporarily globally windowed as part of writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only ever be 1 request in flight. Having only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work for all use cases. If this number is less than the number of maximum workers in your pipeline, the IO work will result in a sub-optimal distribution of the write step with most runners.- Parameters:
maxParallelRequests
- the maximum number of parallel bulk requests- Returns:
- the
ElasticsearchIO.BulkIO
with maximum parallel bulk requests
-
withThrowWriteErrors
Whether to throw runtime exceptions when write (IO) errors occur. Especially useful in streaming pipelines where non-transient IO failures will cause infinite retries. If true, a runtime error will be thrown for any error found byElasticsearchIO.createWriteReport(org.apache.http.HttpEntity, java.util.Set<java.lang.String>, boolean)
and/or java.io.IOException (which is what org.elasticsearch.client.ResponseException based on) found by in batch flush. If false, aPCollectionTuple
will be returned with tagsElasticsearchIO.Write.SUCCESSFUL_WRITES
andElasticsearchIO.Write.FAILED_WRITES
, each being aPCollection
ofElasticsearchIO.Document
representing documents which were written to Elasticsearch without errors and those which failed to write due to errors, respectively.- Parameters:
throwWriteErrors
- whether to surface write errors as runtime exceptions or return them in aPCollection
- Returns:
- the
ElasticsearchIO.BulkIO
with write error treatment configured
-
expand
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<ElasticsearchIO.Document>,
PCollectionTuple>
-
withMaxParallelRequests(int)
instead.