Source code for apache_beam.io.gcp.healthcare.dicomclient

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

from google.auth import default
from google.auth.transport import requests


[docs]class DicomApiHttpClient: """Creates a client that communicates with the GCP Healthcare API (https://cloud.google.com/healthcare-api), over HTTP. Note that this client strictly uses v1 of this API.""" healthcare_base_url = "https://healthcare.googleapis.com/v1" session = None
[docs] def get_session(self, credential): if self.session: return self.session # if the credential is not provided, use the default credential. if not credential: credential, _ = default() new_seesion = requests.AuthorizedSession(credential) self.session = new_seesion return new_seesion
[docs] def dicomweb_store_instance( self, project_id, region, dataset_id, dicom_store_id, dcm_file, credential=None): """Function for storing a DICOM instance.""" api_endpoint = "{}/projects/{}/locations/{}".format( self.healthcare_base_url, project_id, region) dicomweb_path = "{}/datasets/{}/dicomStores/{}/dicomWeb/studies".format( api_endpoint, dataset_id, dicom_store_id) # Make an authenticated API request session = self.get_session(credential) content_type = "application/dicom" headers = {"Content-Type": content_type} response = session.post(dicomweb_path, data=dcm_file, headers=headers) return None, response.status_code