Class ElasticsearchIO
Reading from Elasticsearch
ElasticsearchIO.read()
returns a bounded PCollection<String>
representing JSON documents.
To configure the read()
, you have to provide a connection configuration
containing the HTTP address of the instances, an index name and a type. The following example
illustrates options for configuring the source:
pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
The connection configuration also accepts optional configuration: withUsername()
,
withPassword()
, withApiKey()
, withBearerToken()
and
withDefaultHeaders()
.
You can also specify a query on the read()
using withQuery()
.
There are many more configuration options which can be found by looking at the with* methods
of ElasticsearchIO.Read
Writing to Elasticsearch
To write documents to Elasticsearch, use ElasticsearchIO.write()
, which writes JSON documents from a PCollection<String>
(which can be bounded or unbounded).
ElasticsearchIO.Write
involves 2 discrete steps:
- Converting the input PCollection of valid ES documents into Bulk API directives i.e. Should
the input document result in: update, insert, delete, with version, with routing, etc (See
ElasticsearchIO.DocToBulk
) - Batching Bulk API directives together and interfacing with an Elasticsearch cluster. (See
ElasticsearchIO.BulkIO
)
In most cases, using write()
will be desirable. In some cases, one may
want to use ElasticsearchIO.DocToBulk
and ElasticsearchIO.BulkIO
directly. Such
cases might include:
- Unit testing. Ensure that output Bulk API entities for a given set of inputs will produce
an expected result, without the need for an available Elasticsearch cluster. See
docToBulk()
- Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake.
- Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation.
- Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API directives based on multiple input types, and then "fan-in" all serialized Bulk directives into a single BulkIO transform to improve batching semantics.
- Decoupled jobs. Job(s) could be made to produce Bulk directives and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s).
Note that configurations options for ElasticsearchIO.Write
are a union of
configuration options for ElasticsearchIO.DocToBulk
and ElasticsearchIO.BulkIO
.
To configure ElasticsearchIO.write()
, similar to the read, you
have to provide a connection configuration. For instance:
pipeline
.apply(...)
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
There are many more configuration options which can be found by looking at the with* methods
of ElasticsearchIO.Write
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
ABoundedSource
reading from Elasticsearch.static class
APTransform
writing Bulk API entities created byElasticsearchIO.DocToBulk
to an Elasticsearch cluster.static class
A POJO describing a connection configuration to Elasticsearch.static class
APTransform
converting docs to their Bulk API counterparts.static class
static class
static class
APTransform
reading data from Elasticsearch.static class
A POJO encapsulating a configuration for retry behavior when issuing requests to ES.static class
APTransform
writing data to Elasticsearch. -
Method Summary
Modifier and TypeMethodDescriptionstatic ElasticsearchIO.BulkIO
bulkIO()
static ElasticsearchIO.DocToBulk
static ElasticsearchIO.Read
read()
static ElasticsearchIO.Write
write()
-
Method Details
-
read
-
docToBulk
-
bulkIO
-
write
-