Class ElasticsearchIO.BulkIO

All Implemented Interfaces:
Serializable, HasDisplayData
Enclosing class:
ElasticsearchIO

public abstract static class ElasticsearchIO.BulkIO extends PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>
A PTransform writing Bulk API entities created by ElasticsearchIO.DocToBulk to an Elasticsearch cluster. Typically, using ElasticsearchIO.Write is preferred, whereas using ElasticsearchIO.DocToBulk and BulkIO separately is for advanced use cases such as mirroring data to multiple clusters or data lakes without recomputation.
See Also:
  • Constructor Details

    • BulkIO

      public BulkIO()
  • Method Details

    • withConnectionConfiguration

      public ElasticsearchIO.BulkIO withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration connectionConfiguration)
      Provide the Elasticsearch connection configuration object.
      Parameters:
      connectionConfiguration - the Elasticsearch ElasticsearchIO.ConnectionConfiguration object
      Returns:
      the ElasticsearchIO.BulkIO with connection configuration set
    • withMaxBatchSize

      public ElasticsearchIO.BulkIO withMaxBatchSize(long batchSize)
      Provide a maximum size in number of documents for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html). Default is 1000 docs (like Elasticsearch bulk size advice). See https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the execution engine, size of bundles may vary, this sets the maximum size. Change this if you need to have smaller ElasticSearch bulks.
      Parameters:
      batchSize - maximum batch size in number of documents
      Returns:
      the ElasticsearchIO.BulkIO with connection batch size set
    • withMaxBatchSizeBytes

      public ElasticsearchIO.BulkIO withMaxBatchSizeBytes(long batchSizeBytes)
      Provide a maximum size in bytes for the batch see bulk API (https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html). Default is 5MB (like Elasticsearch bulk size advice). See https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the execution engine, size of bundles may vary, this sets the maximum size. Change this if you need to have smaller ElasticSearch bulks.
      Parameters:
      batchSizeBytes - maximum batch size in bytes
      Returns:
      the ElasticsearchIO.BulkIO with connection batch size in bytes set
    • withRetryConfiguration

      public ElasticsearchIO.BulkIO withRetryConfiguration(ElasticsearchIO.RetryConfiguration retryConfiguration)
      Provides configuration to retry a failed batch call to Elasticsearch. A batch is considered as failed if the underlying RestClient surfaces 429 HTTP status code as error for one or more of the items in the Response. Users should consider that retrying might compound the underlying problem which caused the initial failure. Users should also be aware that once retrying is exhausted the error is surfaced to the runner which may then opt to retry the current bundle in entirety or abort if the max number of retries of the runner is completed. Retrying uses an exponential backoff algorithm, with minimum backoff of 5 seconds and then surfacing the error once the maximum number of retries or maximum configuration duration is exceeded.

      Example use:

      
       ElasticsearchIO.write()
         .withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, Duration.standardMinutes(3))
         ...
       
      Parameters:
      retryConfiguration - the rules which govern the retry behavior
      Returns:
      the ElasticsearchIO.BulkIO with retrying configured
    • withIgnoreVersionConflicts

      public ElasticsearchIO.BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts)
      Whether or not to suppress version conflict errors in a Bulk API response. This can be useful if your use case involves using external version types.
      Parameters:
      ignoreVersionConflicts - true to suppress version conflicts, false to surface version conflict errors.
      Returns:
      the ElasticsearchIO.BulkIO with version conflict handling configured
    • withAllowableResponseErrors

      public ElasticsearchIO.BulkIO withAllowableResponseErrors(@Nullable Set<String> allowableResponseErrorTypes)
      Provide a set of textual error types which can be contained in Bulk API response items[].error.type field. Any element in @param allowableResponseErrorTypes will suppress errors of the same type in Bulk responses.

      See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex

      Parameters:
      allowableResponseErrorTypes -
      Returns:
      the ElasticsearchIO.BulkIO with allowable response errors set
    • withMaxBufferingDuration

      public ElasticsearchIO.BulkIO withMaxBufferingDuration(Duration maxBufferingDuration)
      If using withUseStatefulBatches(boolean), this can be used to set a maximum elapsed time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this config is not set, Bulk requests will not be issued until getMaxBatchSize() number of documents have been buffered. This may result in higher latency in particular if your max batch size is set to a large value and your pipeline input is low volume.
      Parameters:
      maxBufferingDuration - the maximum duration to wait before sending any buffered documents to Elasticsearch, regardless of maxBatchSize.
      Returns:
      the ElasticsearchIO.BulkIO with maximum buffering duration set
    • withUseStatefulBatches

      public ElasticsearchIO.BulkIO withUseStatefulBatches(boolean useStatefulBatches)
      Whether or not to use Stateful Processing to ensure bulk requests have the desired number of entities i.e. as close to the maxBatchSize as possible. By default without this feature enabled, Bulk requests will not contain more than maxBatchSize entities, but the lower bound of batch size is determined by Beam Runner bundle sizes, which may be as few as 1.
      Parameters:
      useStatefulBatches - true enables the use of Stateful Processing to ensure that batches are as close to the maxBatchSize as possible.
      Returns:
      the ElasticsearchIO.BulkIO with Stateful Processing enabled or disabled
    • withMaxParallelRequestsPerWindow

      @Deprecated public ElasticsearchIO.BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests)
      Deprecated.
      When using withUseStatefulBatches(boolean) Stateful Processing, states and therefore batches are maintained per-key-per-window. BE AWARE that low values for @param maxParallelRequests, in particular if the input data has a finite number of windows, can reduce parallelism greatly. Because data will be temporarily globally windowed as part of writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only ever be 1 request in flight. Having only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work for all use cases. If this number is less than the number of maximum workers in your pipeline, the IO work will result in a sub-optimal distribution of the write step with most runners.
      Parameters:
      maxParallelRequests - the maximum number of parallel bulk requests for a window of data
      Returns:
      the ElasticsearchIO.BulkIO with maximum parallel bulk requests per window set
    • withMaxParallelRequests

      public ElasticsearchIO.BulkIO withMaxParallelRequests(int maxParallelRequests)
      When using withUseStatefulBatches(boolean) Stateful Processing, states and therefore batches are maintained per-key-per-window. BE AWARE that low values for @param maxParallelRequests, in particular if the input data has a finite number of windows, can reduce parallelism greatly. Because data will be temporarily globally windowed as part of writing data to Elasticsearch, if @param maxParallelRequests is set to 1, there will only ever be 1 request in flight. Having only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is not overwhelmed by parallel requests, but may not work for all use cases. If this number is less than the number of maximum workers in your pipeline, the IO work will result in a sub-optimal distribution of the write step with most runners.
      Parameters:
      maxParallelRequests - the maximum number of parallel bulk requests
      Returns:
      the ElasticsearchIO.BulkIO with maximum parallel bulk requests
    • withThrowWriteErrors

      public ElasticsearchIO.BulkIO withThrowWriteErrors(boolean throwWriteErrors)
      Whether to throw runtime exceptions when write (IO) errors occur. Especially useful in streaming pipelines where non-transient IO failures will cause infinite retries. If true, a runtime error will be thrown for any error found by ElasticsearchIO.createWriteReport(org.apache.http.HttpEntity, java.util.Set<java.lang.String>, boolean) and/or java.io.IOException (which is what org.elasticsearch.client.ResponseException based on) found by in batch flush. If false, a PCollectionTuple will be returned with tags ElasticsearchIO.Write.SUCCESSFUL_WRITES and ElasticsearchIO.Write.FAILED_WRITES, each being a PCollection of ElasticsearchIO.Document representing documents which were written to Elasticsearch without errors and those which failed to write due to errors, respectively.
      Parameters:
      throwWriteErrors - whether to surface write errors as runtime exceptions or return them in a PCollection
      Returns:
      the ElasticsearchIO.BulkIO with write error treatment configured
    • expand

      Description copied from class: PTransform
      Override this method to specify how this PTransform should be expanded on the given InputT.

      NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

      Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

      Specified by:
      expand in class PTransform<PCollection<ElasticsearchIO.Document>,PCollectionTuple>