Class DatastoreV1

java.lang.Object
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1

public class DatastoreV1 extends Object
DatastoreV1 provides an API to Read, Write and Delete PCollections of Google Cloud Datastore version v1 Entity objects. Read is only supported for Bounded PCollections while Write and Delete are supported for both Bounded and Unbounded PCollections.

This API currently requires an authentication workaround. To use DatastoreV1, users must use the gcloud command line tool to get credentials for Cloud Datastore:

 $ gcloud auth login
 

To read a PCollection from a query to Cloud Datastore, use read() and its methods DatastoreV1.Read.withProjectId(java.lang.String) and DatastoreV1.Read.withQuery(com.google.datastore.v1.Query) to specify the project to query and the query to read from. You can optionally provide a namespace to query within using DatastoreV1.Read.withDatabaseId(java.lang.String) or DatastoreV1.Read.withNamespace(java.lang.String). You could also optionally specify how many splits you want for the query using DatastoreV1.Read.withNumQuerySplits(int).

For example:


 // Read a query from Datastore
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
 Query query = ...;
 String databaseId = "...";
 String projectId = "...";

 Pipeline p = Pipeline.create(options);
 PCollection<Entity> entities = p.apply(
     DatastoreIO.v1().read()
         .withProjectId(projectId)
         .withDatabaseId(databaseId)
         .withQuery(query));
 

Note: A runner may read from Cloud Datastore in parallel across many workers. However, when the Query is configured with a limit using Query.Builder.setLimit(Int32Value) or if the Query contains inequality filters like GREATER_THAN, LESS_THAN etc., then all returned results will be read by a single worker in order to ensure correct data. Since data is read from a single worker, this could have a significant impact on the performance of the job.

To write a PCollection to a Cloud Datastore, use write(), specifying the Cloud Datastore project to write to:


 PCollection<Entity> entities = ...;
 entities.apply(DatastoreIO.v1().write().withProjectId(projectId).withDatabaseId(databaseId));
 p.run();
 

To delete a PCollection of Entities from Cloud Datastore, use deleteEntity(), specifying the Cloud Datastore project to write to:


 PCollection<Entity> entities = ...;
 entities.apply(DatastoreIO.v1().deleteEntity().withProjectId(projectId).withDatabaseId(databaseId));
 p.run();
 

To delete entities associated with a PCollection of Keys from Cloud Datastore, use deleteKey(), specifying the Cloud Datastore project to write to:


 PCollection<Entity> entities = ...;
 entities.apply(DatastoreIO.v1().deleteKey().withProjectId(projectId).withDatabaseId(databaseId));
 p.run();
 

Write and delete operations will follow a gradual ramp-up by default in order to protect Cloud Datastore from potential overload. This rate limit follows a heuristic based on the expected number of workers. To optimize throughput in this initial stage, you can provide a hint to the relevant PTransform by calling withHintNumWorkers, e.g., DatastoreIO.v1().deleteKey().withHintNumWorkers(numWorkers). While not recommended, you can also turn this off via .withRampupThrottlingDisabled().

Entities in the PCollection to be written or deleted must have complete Keys. Complete Keys specify the name and id of the Entity, where incomplete Keys do not. A namespace other than projectId default may be used by specifying it in the Entity Keys.


 Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
 keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
 

Entities will be committed as upsert (update or insert) or delete mutations. Please read Entities, Properties, and Keys for more information about Entity keys.

Permissions

Permission requirements depend on the PipelineRunner that is used to execute the pipeline. Please refer to the documentation of corresponding PipelineRunners for more details.

Please see Cloud Datastore Sign Up for security and permission related information specific to Cloud Datastore.

Optionally, Cloud Datastore V1 Emulator, running locally, could be used for testing purposes by providing the host port information through withLocalhost("host:port" for all the above transforms. In such a case, all the Cloud Datastore API calls are directed to the Emulator.

See Also: