Class ElasticsearchIO

java.lang.Object
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO

public class ElasticsearchIO extends Object
Transforms for reading and writing data from/to Elasticsearch.

Reading from Elasticsearch

ElasticsearchIO.read() returns a bounded PCollection<String> representing JSON documents.

To configure the read(), you have to provide a connection configuration containing the HTTP address of the instances, an index name and a type. The following example illustrates options for configuring the source:


 pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(
    ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
 )

 

The connection configuration also accepts optional configuration: withUsername(), withPassword(), withApiKey(), withBearerToken() and withDefaultHeaders().

You can also specify a query on the read() using withQuery().

There are many more configuration options which can be found by looking at the with* methods of ElasticsearchIO.Read

Writing to Elasticsearch

To write documents to Elasticsearch, use ElasticsearchIO.write(), which writes JSON documents from a PCollection<String> (which can be bounded or unbounded).

ElasticsearchIO.Write involves 2 discrete steps:

  • Converting the input PCollection of valid ES documents into Bulk API directives i.e. Should the input document result in: update, insert, delete, with version, with routing, etc (See ElasticsearchIO.DocToBulk)
  • Batching Bulk API directives together and interfacing with an Elasticsearch cluster. (See ElasticsearchIO.BulkIO)

In most cases, using write() will be desirable. In some cases, one may want to use ElasticsearchIO.DocToBulk and ElasticsearchIO.BulkIO directly. Such cases might include:

  • Unit testing. Ensure that output Bulk API entities for a given set of inputs will produce an expected result, without the need for an available Elasticsearch cluster. See docToBulk()
  • Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake.
  • Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation.
  • Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API directives based on multiple input types, and then "fan-in" all serialized Bulk directives into a single BulkIO transform to improve batching semantics.
  • Decoupled jobs. Job(s) could be made to produce Bulk directives and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s).

Note that configurations options for ElasticsearchIO.Write are a union of configuration options for ElasticsearchIO.DocToBulk and ElasticsearchIO.BulkIO.

To configure ElasticsearchIO.write(), similar to the read, you have to provide a connection configuration. For instance:


 pipeline
   .apply(...)
   .apply(ElasticsearchIO.write().withConnectionConfiguration(
      ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
   )

 

There are many more configuration options which can be found by looking at the with* methods of ElasticsearchIO.Write