apache_beam.io.gcp.healthcare.dicomio module
DICOM IO connector This module implements several tools to facilitate the interaction between a Google Cloud Healthcare DICOM store and a Beam pipeline.
For more details on DICOM store and API: https://cloud.google.com/healthcare/docs/how-tos/dicom
The DICOM IO connector can be used to search metadata or write DICOM files to DICOM store.
When used together with Google Pubsub message connector, the FormatToQido PTransform implemented in this module can be used to convert Pubsub messages to search requests.
Since Traceability is crucial for healthcare API users, every input or error message will be recorded in the output of the DICOM IO connector. As a result, every PTransform in this module will return a PCollection of dict that encodes results and detailed error messages.
Search instance’s metadata (QIDO request)
DicomSearch() wraps the QIDO request client and supports 3 levels of search. Users should specify the level by setting the ‘search_type’ entry in the input dict. They can also refine the search by adding tags to filter the results using the ‘params’ entry. Here is a sample usage:
- with Pipeline() as p:
- input_dict = p | beam.Create(
[{‘project_id’: ‘abc123’, ‘type’: ‘instances’,…}, {‘project_id’: ‘dicom_go’, ‘type’: ‘series’,…}])
results = input_dict | io.gcp.DicomSearch() results | ‘print successful search’ >> beam.Map( lambda x: print(x[‘result’] if x[‘success’] else None))
results | ‘print failed search’ >> beam.Map( lambda x: print(x[‘result’] if not x[‘success’] else None))
In the example above, successful qido search results and error messages for failed requests are printed. When used in real life, user can choose to filter those data and output them to wherever they want.
Convert DICOM Pubsub message to Qido search request
Healthcare API users might read messages from Pubsub to monitor the store operations (e.g. new file) in a DICOM storage. Pubsub message encode DICOM as a web store path as well as instance ids. If users are interested in getting new instance’s metadata, they can use the FormatToQido transform to convert the message into Qido Search dict then use the DicomSearch transform. Here is a sample usage:
pipeline_options = PipelineOptions() pipeline_options.view_as(StandardOptions).streaming = True p = beam.Pipeline(options=pipeline_options) pubsub = p | beam.io.ReadStringFromPubsub(subscription=’a_dicom_store’) results = pubsub | FormatToQido() success = results | ‘filter message’ >> beam.Filter(lambda x: x[‘success’]) qido_dict = success | ‘get qido request’ >> beam.Map(lambda x: x[‘result’]) metadata = qido_dict | DicomSearch()
In the example above, the pipeline is listening to a pubsub topic and waiting for messages from DICOM API. When a new DICOM file comes into the storage, the pipeline will receive a pubsub message, convert it to a Qido request dict, and feed it to DicomSearch() PTransform. As a result, users can get the metadata for every new DICOM file. Note that not every pubsub message received is from DICOM API, so we to filter the results first.
Store a DICOM file in a DICOM storage
UploadToDicomStore() wraps store request API and users can use it to send a DICOM file to a DICOM store. It supports two types of input: 1. fileio object 2. file data in byte[]. Users should set the ‘input_type’ when initialzing this PTransform. Here are the examples:
input_dict = {‘project_id’: ‘abc123’, ‘type’: ‘instances’,…} str_input = json.dumps(dict_input) temp_dir = ‘%s%s’ % (self._new_tempdir(), os.sep) self._create_temp_file(dir=temp_dir, content=str_input) with Pipeline() as p: results = ( p | beam.Create([FileSystems.join(temp_dir, ‘*’)]) | fileio.MatchAll() | fileio.ReadMatches() | UploadToDicomStore(input_dict, ‘fileio’))
input_dict = {‘project_id’: ‘abc123’, ‘type’: ‘instances’,…} str_input = json.dumps(dict_input) bytes_input = bytes(str_input.encode(“utf-8”)) with Pipeline() as p: results = ( p | beam.Create([bytes_input]) | UploadToDicomStore(input_dict, ‘bytes’))
The first example uses a PCollection of fileio objects as input. UploadToDicomStore will read DICOM files from the objects and send them to a DICOM storage. The second example uses a PCollection of byte[] as input. UploadToDicomStore will directly send those DICOM files to a DICOM storage. Users can also get the operation results in the output PCollection if they want to handle the failed store requests.
- class apache_beam.io.gcp.healthcare.dicomio.DicomSearch(buffer_size=8, max_workers=5, client=None, credential=None)[source]
Bases:
PTransform
A PTransform used for retrieving DICOM instance metadata from Google Cloud DICOM store. It takes a PCollection of dicts as input and return a PCollection of dict as results: INPUT: The input dict represents DICOM web path parameters, which has the following string keys and values: { ‘project_id’: str, ‘region’: str, ‘dataset_id’: str, ‘dicom_store_id’: str, ‘search_type’: str, ‘params’: dict(str,str) (Optional), }
- Key-value pairs:
project_id: Id of the project in which the DICOM store is located. (Required) region: Region where the DICOM store resides. (Required) dataset_id: Id of the dataset where DICOM store belongs to. (Required) dicom_store_id: Id of the dicom store. (Required) search_type: Which type of search it is, could only be one of the three values: ‘instances’, ‘series’, or ‘studies’. (Required) params: A dict of str:str pairs used to refine QIDO search. (Optional) Supported tags in three categories: 1.Studies: * StudyInstanceUID, * PatientName, * PatientID, * AccessionNumber, * ReferringPhysicianName, * StudyDate, 2.Series: all study level search terms and * SeriesInstanceUID, * Modality, 3.Instances: all study/series level search terms and * SOPInstanceUID,
e.g. {“StudyInstanceUID”:”1”,”SeriesInstanceUID”:”2”}
OUTPUT: The output dict wraps results as well as error messages: { ‘result’: a list of dicts in JSON style. ‘success’: boolean value telling whether the operation is successful. ‘input’: detail ids and dicomweb path for this retrieval. ‘status’: status code from the server, used as error message. }
Initializes DicomSearch. :param buffer_size: # type: Int. Size of the request buffer. :param max_workers: # type: Int. Maximum number of threads a worker can :param create. If it is set to one: :param all the request will be processed: :param sequentially in a worker.: :param client: # type: object. If it is specified, all the Api calls will :param made by this client instead of the default one: :type made by this client instead of the default one: DicomApiHttpClient :param credential: # type: Google credential object, if it is specified, the :param Http client will use it to create sessions instead of the default.:
- class apache_beam.io.gcp.healthcare.dicomio.FormatToQido(credential=None)[source]
Bases:
PTransform
A PTransform for converting pubsub messages into search input dict. Takes PCollection of string as input and returns a PCollection of dict as results. Note that some pubsub messages may not be from DICOM API, which will be recorded as failed conversions. INPUT: The input are normally strings from Pubsub topic: “projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/ dicomStores/DICOM_STORE_ID/dicomWeb/studies/STUDY_UID/ series/SERIES_UID/instances/INSTANCE_UID”
OUTPUT: The output dict encodes results as well as error messages: { ‘result’: a dict representing instance level qido search request. ‘success’: boolean value telling whether the conversion is successful. ‘input’: input pubsub message string. }
Initializes FormatToQido. :param credential: # type: Google credential object, if it is specified, the :param Http client will use it instead of the default one.:
- class apache_beam.io.gcp.healthcare.dicomio.UploadToDicomStore(destination_dict, input_type, buffer_size=8, max_workers=5, client=None, credential=None)[source]
Bases:
PTransform
A PTransform for storing instances to a DICOM store. Takes PCollection of byte[] as input and return a PCollection of dict as results. The inputs are normally DICOM file in bytes or str filename. INPUT: This PTransform supports two types of input: 1. Byte[]: representing dicom file. 2. Fileio object: stream file object.
OUTPUT: The output dict encodes status as well as error messages: { ‘success’: boolean value telling whether the store is successful. ‘input’: undeliverable data. Exactly the same as the input, only set if the operation is failed. ‘status’: status code from the server, used as error messages. }
Initializes UploadToDicomStore. :param destination_dict: # type: python dict, encodes DICOM endpoint information: :param {: :param ‘project_id’: str, :param ‘region’: str, :param ‘dataset_id’: str, :param ‘dicom_store_id’: str, :param }: :param Key-value pairs: :param * project_id: Id of the project in which DICOM store locates. (Required) :param * region: Region where the DICOM store resides. (Required) :param * dataset_id: Id of the dataset where DICOM store belongs to. (Required) :param * dicom_store_id: Id of the dicom store. (Required) :param input_type: # type: string, could only be ‘bytes’ or ‘fileio’ :param buffer_size: # type: Int. Size of the request buffer. :param max_workers: # type: Int. Maximum number of threads a worker can :param create. If it is set to one: :param all the request will be processed: :param sequentially in a worker.: :param client: # type: object. If it is specified, all the Api calls will :param made by this client instead of the default one: :type made by this client instead of the default one: DicomApiHttpClient :param credential: # type: Google credential object, if it is specified, the :param Http client will use it instead of the default one.: