public class FhirIO
extends java.lang.Object
FhirIO
provides an API for reading and writing resources to Google Cloud Healthcare Fhir API.
FHIR resources can be read with FhirIO.Read
, which supports use cases where you have a
$PCollection
of message IDs. This is appropriate for reading the Fhir notifications from
a Pub/Sub subscription with PubsubIO.readStrings()
or in cases where you have a manually
prepared list of messages that you need to process (e.g. in a text file read with TextIO
*) .
Fetch Resource contents from Fhir Store based on the PCollection
of message ID strings
FhirIO.Read.Result
where one can call FhirIO.Read.Result.getResources()
to retrieve a
PCollection
containing the successfully fetched String
s and/or FhirIO.Read.Result.getFailedReads()
* to retrieve a PCollection
of HealthcareIOError
* containing the resource ID that could not be fetched and the exception as a
HealthcareIOError
, this can be used to write to the dead letter storage system of your
choosing. This error handling is mainly to transparently surface errors where the upstream PCollection
* contains IDs that are not valid or are not reachable due to permissions issues.
Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL pipeline).
Import This is best for use cases where you are populating an empty FHIR store with no other clients. It is faster than the execute bundles method but does not respect referential integrity and the resources are not written transactionally (e.g. a historicaly backfill on a new FHIR store) This requires each resource to contain a client provided ID. It is important that when using import you give the appropriate permissions to the Google Cloud Healthcare Service Agent.
Export This is to export FHIR resources from a FHIR store to Google Cloud Storage. The output resources are in ndjson (newline delimited json) of FHIR resources. It is important that when using export you give the appropriate permissions to the Google Cloud Healthcare Service Agent.
Deidentify This is to de-identify FHIR resources from a source FHIR store and write the result to a destination FHIR store. It is important that the destination store must already exist.
Search This is to search FHIR resources within a given FHIR store. The inputs are individual FHIR Search queries, represented by the FhirSearchParameter class. The outputs are results of each Search, represented as a Json array of FHIR resources in string form, with pagination handled, and an optional input key.
Example
{@code Pipeline pipeline = ... // Tail the FHIR store by retrieving resources based on Pub/Sub notifications. FhirIO.Read.Result readResult = p .apply("Read FHIR notifications", PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription())) .apply(FhirIO.readResources()); // happily retrived messages PCollectionresources = readResult.getResources(); // message IDs that couldn't be retrieved + error context PCollection > failedReads = readResult.getFailedReads(); failedReads.apply("Write Message IDs / Stacktrace for Failed Reads to BigQuery", BigQueryIO .write() .to(option.getBQFhirExecuteBundlesDeadLetterTable()) .withFormatFunction(new HealthcareIOErrorToTableRow())); output = resources.apply("Happy path transformations", ...); FhirIO.Write.Result writeResult = output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore())); PCollection > failedBundles = writeResult.getFailedInsertsWithErr(); failedBundles.apply("Write failed bundles to BigQuery", BigQueryIO .write() .to(option.getBQFhirExecuteBundlesDeadLetterTable()) .withFormatFunction(new HealthcareIOErrorToTableRow())); // Alternatively you could use import for high throughput to a new store. FhirIO.Write.Result writeResult = output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore())); // Export FHIR resources to Google Cloud Storage. String fhirStoreName = ...; String exportGcsUriPrefix = ...; PCollection resources = pipeline.apply(FhirIO.exportResourcesToGcs(fhirStoreName, exportGcsUriPrefix)); // De-identify FHIR resources. String sourceFhirStoreName = ...; String destinationFhirStoreName = ...; DeidentifyConfig deidConfig = new DeidentifyConfig(); // use default DeidentifyConfig pipeline.apply(FhirIO.deidentify(fhirStoreName, destinationFhirStoreName, deidConfig)); // Search FHIR resources using an "OR" query. Map queries = new HashMap<>(); queries.put("name", "Alice,Bob"); FhirSearchParameter searchParameter = FhirSearchParameter.of("Patient", queries); PCollection > searchQueries = pipeline.apply( Create.of(searchParameter) .withCoder(FhirSearchParameterCoder.of(StringUtf8Coder.of()))); FhirIO.Search.Result searchResult = searchQueries.apply(FhirIO.searchResources(options.getFhirStore())); PCollection resources = searchResult.getResources(); // JsonArray of results // Search FHIR resources using an "AND" query with a key. Map > listQueries = new HashMap<>(); listQueries.put("name", Arrays.asList("Alice", "Bob")); FhirSearchParameter > listSearchParameter = FhirSearchParameter.of("Patient", "Alice-Bob-Search", listQueries); PCollection
>> listSearchQueries = pipeline.apply( Create.of(listSearchParameter) .withCoder(FhirSearchParameterCoder.of(ListCoder.of(StringUtf8Coder.of())))); FhirIO.Search.Result listSearchResult = searchQueries.apply(FhirIO.searchResources(options.getFhirStore())); PCollection > listResource = listSearchResult.getKeyedResources(); // KV<"Alice-Bob-Search", JsonArray of results>
Modifier and Type | Class and Description |
---|---|
static class |
FhirIO.Deidentify
Deidentify FHIR resources from a FHIR store to a destination FHIR store.
|
static class |
FhirIO.ExecuteBundles
The type Execute bundles.
|
static class |
FhirIO.Export
Export FHIR resources from a FHIR store to new line delimited json files on GCS.
|
static class |
FhirIO.Import
Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a
fhirStores.import Request for that file.
|
static class |
FhirIO.Read
The type Read.
|
static class |
FhirIO.Search<T>
The type Search.
|
static class |
FhirIO.Write
The type Write.
|
Constructor and Description |
---|
FhirIO() |
Modifier and Type | Method and Description |
---|---|
static FhirIO.Deidentify |
deidentify(java.lang.String sourceFhirStore,
java.lang.String destinationFhirStore,
com.google.api.services.healthcare.v1.model.DeidentifyConfig deidConfig)
Deidentify FHIR resources.
|
static FhirIO.Deidentify |
deidentify(ValueProvider<java.lang.String> sourceFhirStore,
ValueProvider<java.lang.String> destinationFhirStore,
ValueProvider<com.google.api.services.healthcare.v1.model.DeidentifyConfig> deidConfig)
Deidentify FHIR resources.
|
static FhirIO.Export |
exportResourcesToGcs(java.lang.String fhirStore,
java.lang.String exportGcsUriPrefix)
Export resources to GCS.
|
static FhirIO.Export |
exportResourcesToGcs(ValueProvider<java.lang.String> fhirStore,
ValueProvider<java.lang.String> exportGcsUriPrefix)
Export resources to GCS.
|
static FhirIO.Import |
importResources(java.lang.String fhirStore,
java.lang.String tempDir,
java.lang.String deadLetterDir,
FhirIO.Import.ContentStructure contentStructure)
Import resources.
|
static FhirIO.Import |
importResources(ValueProvider<java.lang.String> fhirStore,
ValueProvider<java.lang.String> tempDir,
ValueProvider<java.lang.String> deadLetterDir,
FhirIO.Import.ContentStructure contentStructure)
Import resources.
|
static FhirIO.Read |
readResources()
Read resources from a PCollection of resource IDs (e.g.
|
static FhirIO.Search<java.lang.String> |
searchResources(java.lang.String fhirStore)
Search resources from a Fhir store with String parameter values.
|
static FhirIO.Search<? extends java.lang.Object> |
searchResourcesWithGenericParameters(java.lang.String fhirStore)
Search resources from a Fhir store with any type of parameter values.
|
public static FhirIO.Read readResources()
FhirIO.Read
public static FhirIO.Search<java.lang.String> searchResources(java.lang.String fhirStore)
FhirIO.Search
public static FhirIO.Search<? extends java.lang.Object> searchResourcesWithGenericParameters(java.lang.String fhirStore)
FhirIO.Search
public static FhirIO.Import importResources(java.lang.String fhirStore, java.lang.String tempDir, java.lang.String deadLetterDir, FhirIO.Import.ContentStructure contentStructure)
fhirStore
- the fhir storetempDir
- the temp dirdeadLetterDir
- the dead letter dircontentStructure
- the content structureFhirIO.Import
public static FhirIO.Import importResources(ValueProvider<java.lang.String> fhirStore, ValueProvider<java.lang.String> tempDir, ValueProvider<java.lang.String> deadLetterDir, FhirIO.Import.ContentStructure contentStructure)
fhirStore
- the fhir storetempDir
- the temp dirdeadLetterDir
- the dead letter dircontentStructure
- the content structureFhirIO.Import
public static FhirIO.Export exportResourcesToGcs(java.lang.String fhirStore, java.lang.String exportGcsUriPrefix)
fhirStore
- the fhir store, in the format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_idexportGcsUriPrefix
- the destination GCS dir, in the format:
gs://YOUR_BUCKET_NAME/path/to/a/dirFhirIO.Export
public static FhirIO.Export exportResourcesToGcs(ValueProvider<java.lang.String> fhirStore, ValueProvider<java.lang.String> exportGcsUriPrefix)
fhirStore
- the fhir store, in the format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_idexportGcsUriPrefix
- the destination GCS dir, in the format:
gs://YOUR_BUCKET_NAME/path/to/a/dirFhirIO.Export
public static FhirIO.Deidentify deidentify(java.lang.String sourceFhirStore, java.lang.String destinationFhirStore, com.google.api.services.healthcare.v1.model.DeidentifyConfig deidConfig)
sourceFhirStore
- the source fhir store, in the format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_iddestinationFhirStore
- the destination fhir store to write de-identified resources, in the
format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_iddeidConfig
- the DeidentifyConfigFhirIO.Deidentify
public static FhirIO.Deidentify deidentify(ValueProvider<java.lang.String> sourceFhirStore, ValueProvider<java.lang.String> destinationFhirStore, ValueProvider<com.google.api.services.healthcare.v1.model.DeidentifyConfig> deidConfig)
sourceFhirStore
- the source fhir store, in the format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_iddestinationFhirStore
- the destination fhir store to write de-identified resources, in the
format:
projects/project_id/locations/location_id/datasets/dataset_id/fhirStores/fhir_store_iddeidConfig
- the DeidentifyConfigFhirIO.Deidentify