Class HL7v2IO

java.lang.Object
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IO

public class HL7v2IO extends Object
HL7v2IO provides an API for reading from and writing to Google Cloud Healthcare HL7v2 API.

Read

HL7v2 Messages can be fetched from the HL7v2 store in two ways Message Fetching and Message Listing.

Message Fetching

Message Fetching with HL7v2IO.Read supports use cases where you have a $PCollection of message IDS. This is appropriate for reading the HL7v2 notifications from a Pub/Sub subscription with PubsubIO.readStrings() or in cases where you have a manually prepared list of messages that you need to process (e.g. in a text file read with TextIO) .

Fetch Message contents from HL7v2 Store based on the PCollection of message ID strings HL7v2IO.Read.Result where one can call HL7v2IO.HL7v2ReadResult.getMessages() to retrived a PCollection containing the successfully fetched HL7v2Messages and/or HL7v2IO.HL7v2ReadResult.getFailedReads() to retrieve a PCollection of HealthcareIOError containing the msgID that could not be fetched and the exception as a HealthcareIOError, this can be used to write to the dead letter storage system of your choosing. This error handling is mainly to catch scenarios where the upstream PCollection contains IDs that are not valid or are not reachable due to permissions issues.

Message Listing Message Listing with HL7v2IO.ListHL7v2Messages supports batch use cases where you want to process all the messages in an HL7v2 store or those matching a filter

See Also:
  • https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.hl7V2Stores.messages/list#query-parameters This paginates through results of a Messages.List call @see https://cloud.google.com/healthcare/docs/reference/rest/v1/projects.locations.datasets.hl7V2Stores.messages/list and outputs directly to a PCollection of HL7v2Message. In these use cases, the error handling similar to above is unnecessary because we are listing from the source of truth the pipeline should fail transparently if this transform fails to paginate through all the results.

    Write

    A bounded or unbounded PCollection of HL7v2Message can be ingested into an HL7v2 store using ingestMessages(String). This will return a HL7v2IO.Write.Result on which you can call HL7v2IO.Write.Result.getFailedInsertsWithErr() to retrieve a PCollection of HealthcareIOError containing the HL7v2Message that failed to be ingested and the exception. This can be used to write to the dead letter storage system of your chosing.

    Unbounded Read Example:

    
     PipelineOptions options = ...;
     Pipeline p = Pipeline.create(options);
    
     HL7v2IO.Read.Result readResult = p
       .apply(
         "Read HL7v2 notifications",
         PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription()))
       .apply(HL7v2IO.getAll());
    
     // Write errors to your favorite dead letter  queue (e.g. Pub/Sub, GCS, BigQuery)
     readResult.getFailedReads().apply("WriteToDeadLetterQueue", ...);
    
    
     // Go about your happy path transformations.
     PCollection<HL7v2Message> out = readResult.getMessages().apply("ProcessFetchedMessages", ...);
    
     // Write using the Message.Ingest method of the HL7v2 REST API.
     out.apply(HL7v2IO.ingestMessages(options.getOutputHL7v2Store()));
    
     pipeline.run();
    
     ***
     

    Bounded Read Example:

    
     PipelineOptions options = ...;
     Pipeline p = Pipeline.create(options);
    
     PCollection<HL7v2Message> out = p
       .apply(
           "List messages in HL7v2 store with filter",
           ListHL7v2Messages(
               Collections.singletonList(options.getInputHL7v2Store()), option.getHL7v2Filter()))
        // Go about your happy path transformations.
       .apply("Process HL7v2 Messages", ...);
     pipeline.run().waitUntilFinish();
     ***