Class ElasticsearchIO
Reading from Elasticsearch
ElasticsearchIO.read() returns a bounded PCollection<String> representing JSON documents.
To configure the read(), you have to provide a connection configuration
containing the HTTP address of the instances, an index name and a type. The following example
illustrates options for configuring the source:
pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
The connection configuration also accepts optional configuration: withUsername(),
withPassword(), withApiKey(), withBearerToken() and
withDefaultHeaders().
You can also specify a query on the read() using withQuery().
There are many more configuration options which can be found by looking at the with* methods
of ElasticsearchIO.Read
Writing to Elasticsearch
To write documents to Elasticsearch, use ElasticsearchIO.write(), which writes JSON documents from a PCollection<String> (which can be bounded or unbounded).
ElasticsearchIO.Write involves 2 discrete steps:
- Converting the input PCollection of valid ES documents into Bulk API directives i.e. Should
the input document result in: update, insert, delete, with version, with routing, etc (See
ElasticsearchIO.DocToBulk) - Batching Bulk API directives together and interfacing with an Elasticsearch cluster. (See
ElasticsearchIO.BulkIO)
In most cases, using write() will be desirable. In some cases, one may
want to use ElasticsearchIO.DocToBulk and ElasticsearchIO.BulkIO directly. Such
cases might include:
- Unit testing. Ensure that output Bulk API entities for a given set of inputs will produce
an expected result, without the need for an available Elasticsearch cluster. See
docToBulk() - Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake.
- Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation.
- Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API directives based on multiple input types, and then "fan-in" all serialized Bulk directives into a single BulkIO transform to improve batching semantics.
- Decoupled jobs. Job(s) could be made to produce Bulk directives and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s).
Note that configurations options for ElasticsearchIO.Write are a union of
configuration options for ElasticsearchIO.DocToBulk and ElasticsearchIO.BulkIO.
To configure ElasticsearchIO.write(), similar to the read, you
have to provide a connection configuration. For instance:
pipeline
.apply(...)
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
There are many more configuration options which can be found by looking at the with* methods
of ElasticsearchIO.Write
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classABoundedSourcereading from Elasticsearch.static classAPTransformwriting Bulk API entities created byElasticsearchIO.DocToBulkto an Elasticsearch cluster.static classA POJO describing a connection configuration to Elasticsearch.static classAPTransformconverting docs to their Bulk API counterparts.static classstatic classstatic classAPTransformreading data from Elasticsearch.static classA POJO encapsulating a configuration for retry behavior when issuing requests to ES.static classAPTransformwriting data to Elasticsearch. -
Method Summary
Modifier and TypeMethodDescriptionstatic ElasticsearchIO.BulkIObulkIO()static ElasticsearchIO.DocToBulkstatic ElasticsearchIO.Readread()static ElasticsearchIO.Writewrite()
-
Method Details
-
read
-
docToBulk
-
bulkIO
-
write
-