# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Azure Blob Storage client.

# pytype: skip-file

import errno
import io
import logging
import os
import re
import tempfile
import time

from import auth
from import Downloader
from import DownloaderStream
from import Uploader
from import UploaderStream
from apache_beam.options.pipeline_options import AzureOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated

_LOGGER = logging.getLogger(__name__)

  # pylint: disable=wrong-import-order, wrong-import-position
  # pylint: disable=ungrouped-imports
  from azure.core.exceptions import ResourceNotFoundError
  from import (
except ImportError:

DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024


[docs] def parse_azfs_path(azfs_path, blob_optional=False, get_account=False): """Return the storage account, the container and blob names of the given azfs:// path. """ match = re.match( '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)' '[a-z0-9-]{1,61}[a-z0-9])/(.*)$', azfs_path) if match is None or ( == '' and not blob_optional): raise ValueError( 'Azure Blob Storage path must be in the form ' 'azfs://<storage-account>/<container>/<path>.') result = None if get_account: result =,, else: result =, return result
[docs] def get_azfs_url(storage_account, container, blob=''): """Returns the url in the form of """ return 'https://' + storage_account + '' + \ container + '/' + blob
[docs] class Blob(): """A Blob in Azure Blob Storage.""" def __init__(self, etag, name, last_updated, size, mime_type): self.etag = etag = name self.last_updated = last_updated self.size = size self.mime_type = mime_type
[docs] class BlobStorageIOError(IOError, retry.PermanentException): """Blob Strorage IO error that should not be retried.""" pass
[docs] class BlobStorageError(Exception): """Blob Storage client error.""" def __init__(self, message=None, code=None): self.message = message self.code = code
[docs] class BlobStorageIO(object): """Azure Blob Storage I/O client.""" def __init__(self, client=None, pipeline_options=None): if client is None: azure_options = pipeline_options.view_as(AzureOptions) connect_str = azure_options.azure_connection_string or \ os.getenv('AZURE_STORAGE_CONNECTION_STRING') if connect_str: self.client = BlobServiceClient.from_connection_string( conn_str=connect_str) else: credential = auth.get_service_credentials(pipeline_options) self.client = BlobServiceClient( account_url=azure_options.blob_service_endpoint, credential=credential) else: self.client = client if not AZURE_DEPS_INSTALLED: raise RuntimeError('Azure dependencies are not installed. Unable to run.')
[docs] def open( self, filename, mode='r', read_buffer_size=DEFAULT_READ_BUFFER_SIZE, mime_type='application/octet-stream'): """Open an Azure Blob Storage file path for reading or writing. Args: filename (str): Azure Blob Storage file path in the form ``azfs://<storage-account>/<container>/<path>``. mode (str): ``'r'`` for reading or ``'w'`` for writing. read_buffer_size (int): Buffer size to use during read operations. mime_type (str): Mime type to set for write operations. Returns: Azure Blob Storage file object. Raises: ValueError: Invalid open file mode. """ if mode == 'r' or mode == 'rb': downloader = BlobStorageDownloader( self.client, filename, buffer_size=read_buffer_size) return io.BufferedReader( DownloaderStream( downloader, read_buffer_size=read_buffer_size, mode=mode), buffer_size=read_buffer_size) elif mode == 'w' or mode == 'wb': uploader = BlobStorageUploader(self.client, filename, mime_type) return io.BufferedWriter( UploaderStream(uploader, mode=mode), buffer_size=128 * 1024) else: raise ValueError('Invalid file open mode: %s.' % mode)
[docs] @retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter) def copy(self, src, dest): """Copies a single Azure Blob Storage blob from src to dest. Args: src: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. dest: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. Raises: TimeoutError: on timeout. """ src_storage_account, src_container, src_blob = parse_azfs_path( src, get_account=True) dest_container, dest_blob = parse_azfs_path(dest) source_blob = get_azfs_url(src_storage_account, src_container, src_blob) copied_blob = self.client.get_blob_client(dest_container, dest_blob) try: copied_blob.start_copy_from_url(source_blob) except ResourceNotFoundError as e: message = e.reason code = e.status_code raise BlobStorageError(message, code)
# We intentionally do not decorate this method with a retry, since the # underlying copy operation is already an idempotent operation protected # by retry decorators.
[docs] def copy_tree(self, src, dest): """Renames the given Azure Blob storage directory and its contents recursively from src to dest. Args: src: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. dest: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. Returns: List of tuples of (src, dest, exception) where exception is None if the operation succeeded or the relevant exception if the operation failed. """ assert src.endswith('/') assert dest.endswith('/') results = [] for entry in self.list_prefix(src): rel_path = entry[len(src):] try: self.copy(entry, dest + rel_path) results.append((entry, dest + rel_path, None)) except BlobStorageError as e: results.append((entry, dest + rel_path, e)) return results
# We intentionally do not decorate this method with a retry, since the # underlying copy operation is already an idempotent operation protected # by retry decorators.
[docs] def copy_paths(self, src_dest_pairs): """Copies the given Azure Blob Storage blobs from src to dest. This can handle directory or file paths. Args: src_dest_pairs: List of (src, dest) tuples of azfs://<storage-account>/<container>/[name] file paths to copy from src to dest. Returns: List of tuples of (src, dest, exception) in the same order as the src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ if not src_dest_pairs: return [] results = [] for src_path, dest_path in src_dest_pairs: # Case 1. They are directories. if src_path.endswith('/') and dest_path.endswith('/'): try: results += self.copy_tree(src_path, dest_path) except BlobStorageError as e: results.append((src_path, dest_path, e)) # Case 2. They are individual blobs. elif not src_path.endswith('/') and not dest_path.endswith('/'): try: self.copy(src_path, dest_path) results.append((src_path, dest_path, None)) except BlobStorageError as e: results.append((src_path, dest_path, e)) # Mismatched paths (one directory, one non-directory) get an error. else: e = BlobStorageError( "Unable to copy mismatched paths" + "(directory, non-directory): %s, %s" % (src_path, dest_path), 400) results.append((src_path, dest_path, e)) return results
# We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators.
[docs] def rename(self, src, dest): """Renames the given Azure Blob Storage blob from src to dest. Args: src: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. dest: Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. """ self.copy(src, dest) self.delete(src)
# We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators.
[docs] def rename_files(self, src_dest_pairs): """Renames the given Azure Blob Storage blobs from src to dest. Args: src_dest_pairs: List of (src, dest) tuples of azfs://<storage-account>/<container>/[name] file paths to rename from src to dest. Returns: List of tuples of (src, dest, exception) in the same order as the src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ if not src_dest_pairs: return [] for src, dest in src_dest_pairs: if src.endswith('/') or dest.endswith('/'): raise ValueError('Unable to rename a directory.') # Results from copy operation. copy_results = self.copy_paths(src_dest_pairs) paths_to_delete = \ [src for (src, _, error) in copy_results if error is None] # Results from delete operation. delete_results = self.delete_files(paths_to_delete) # Get rename file results (list of tuples). results = [] # Using a dictionary will make the operation faster. delete_results_dict = {src: error for (src, error) in delete_results} for src, dest, error in copy_results: # If there was an error in the copy operation. if error is not None: results.append((src, dest, error)) # If there was an error in the delete operation. elif delete_results_dict[src] is not None: results.append((src, dest, delete_results_dict[src])) # If there was no error in the operations. else: results.append((src, dest, None)) return results
[docs] def exists(self, path): """Returns whether the given Azure Blob Storage blob exists. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. """ try: self._blob_properties(path) return True except ResourceNotFoundError as e: if e.status_code == 404: # HTTP 404 indicates that the file did not exist. return False else: # We re-raise all other exceptions. raise
[docs] def size(self, path): """Returns the size of a single Blob Storage blob. This method does not perform glob expansion. Hence the given path must be for a single Blob Storage blob. Returns: size of the Blob Storage blob in bytes. """ return self._blob_properties(path).size
[docs] def last_updated(self, path): """Returns the last updated epoch time of a single Azure Blob Storage blob. This method does not perform glob expansion. Hence the given path must be for a single Azure Blob Storage blob. Returns: last updated time of the Azure Blob Storage blob in seconds. """ return self._updated_to_seconds(self._blob_properties(path).last_modified)
[docs] def checksum(self, path): """Looks up the checksum of an Azure Blob Storage blob. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. """ return self._blob_properties(path).etag
def _status(self, path): """For internal use only; no backwards-compatibility guarantees. Returns supported fields (checksum, last_updated, size) of a single object as a dict at once. This method does not perform glob expansion. Hence the given path must be for a single blob property. Returns: dict of fields of the blob property. """ properties = self._blob_properties(path) file_status = {} if hasattr(properties, 'etag'): file_status['checksum'] = properties.etag if hasattr(properties, 'last_modified'): file_status['last_updated'] = self._updated_to_seconds( properties.last_modified) if hasattr(properties, 'size'): file_status['size'] = properties.size return file_status @retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter) def _blob_properties(self, path): """Returns a blob properties object for the given path This method does not perform glob expansion. Hence the given path must be for a single blob properties object. Returns: blob properties. """ container, blob = parse_azfs_path(path) blob_to_check = self.client.get_blob_client(container, blob) try: properties = blob_to_check.get_blob_properties() except ResourceNotFoundError as e: message = e.reason code = e.status_code raise BlobStorageError(message, code) return properties @staticmethod def _updated_to_seconds(updated): """Helper function transform the updated field of response to seconds""" return ( time.mktime(updated.timetuple()) - time.timezone + updated.microsecond / 1000000.0)
[docs] @retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter) def delete(self, path): """Deletes a single blob at the given Azure Blob Storage path. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. """ container, blob = parse_azfs_path(path) blob_to_delete = self.client.get_blob_client(container, blob) try: blob_to_delete.delete_blob() except ResourceNotFoundError as e: if e.status_code == 404: # Return success when the file doesn't exist anymore for idempotency. return else: logging.error('HTTP error while deleting file %s', path) raise e
# We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators.
[docs] def delete_paths(self, paths): """Deletes the given Azure Blob Storage blobs from src to dest. This can handle directory or file paths. Args: paths: list of Azure Blob Storage paths in the form azfs://<storage-account>/<container>/[name] that give the file blobs to be deleted. Returns: List of tuples of (src, dest, exception) in the same order as the src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ directories, blobs = [], [] # Retrieve directories and not directories. for path in paths: if path.endswith('/'): directories.append(path) else: blobs.append(path) results = {} for directory in directories: directory_result = dict(self.delete_tree(directory)) results.update(directory_result) blobs_results = dict(self.delete_files(blobs)) results.update(blobs_results) return results
# We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators.
[docs] def delete_tree(self, root): """Deletes all blobs under the given Azure BlobStorage virtual directory. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name] (ending with a "/"). Returns: List of tuples of (path, exception), where each path is a blob under the given root. exception is None if the operation succeeded or the relevant exception if the operation failed. """ assert root.endswith('/') # Get the blob under the root directory. paths_to_delete = self.list_prefix(root) return self.delete_files(paths_to_delete)
# We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators.
[docs] def delete_files(self, paths): """Deletes the given Azure Blob Storage blobs from src to dest. Args: paths: list of Azure Blob Storage paths in the form azfs://<storage-account>/<container>/[name] that give the file blobs to be deleted. Returns: List of tuples of (src, dest, exception) in the same order as the src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ if not paths: return [] # Group blobs into containers. containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \ for path in paths]) grouped_blobs = {container: [] for container in containers} # Fill dictionary. for container, blob in zip(containers, blobs): grouped_blobs[container].append(blob) results = {} # Delete minibatches of blobs for each container. for container, blobs in grouped_blobs.items(): for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE): blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE] results.update(self._delete_batch(container, blobs_to_delete)) final_results = \ [(path, results[parse_azfs_path(path, get_account=False)]) \ for path in paths] return final_results
@retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter) def _delete_batch(self, container, blobs): """A helper method. Azure Blob Storage Python Client allows batch deletions for blobs within the same container. Args: container: container name. blobs: list of blobs to be deleted. Returns: Dictionary of the form {(container, blob): error}, where error is None if the operation succeeded. """ container_client = self.client.get_container_client(container) results = {} for blob in blobs: try: response = container_client.delete_blob(blob) results[(container, blob)] = response except ResourceNotFoundError as e: results[(container, blob)] = e.status_code return results
[docs] @deprecated(since='2.45.0', current='list_files') def list_prefix(self, path, with_metadata=False): """Lists files matching the prefix. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. with_metadata: Experimental. Specify whether returns file metadata. Returns: If ``with_metadata`` is False: dict of file name -> size; if ``with_metadata`` is True: dict of file name -> tuple(size, timestamp). """ file_info = {} for file_metadata in self.list_files(path, with_metadata): file_info[file_metadata[0]] = file_metadata[1] return file_info
[docs] def list_files(self, path, with_metadata=False): """Lists files matching the prefix. Args: path: Azure Blob Storage file path pattern in the form azfs://<storage-account>/<container>/[name]. with_metadata: Experimental. Specify whether returns file metadata. Returns: If ``with_metadata`` is False: generator of tuple(file name, size); if ``with_metadata`` is True: generator of tuple(file name, tuple(size, timestamp)). """ storage_account, container, blob = parse_azfs_path( path, blob_optional=True, get_account=True) file_info = set() counter = 0 start_time = time.time() if with_metadata: logging.debug("Starting the file information of the input") else: logging.debug("Starting the size estimation of the input") container_client = self.client.get_container_client(container) response = retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter)( container_client.list_blobs)( name_starts_with=blob) for item in response: file_name = "azfs://%s/%s/%s" % (storage_account, container, if file_name not in file_info: file_info.add(file_name) counter += 1 if counter % 10000 == 0: if with_metadata: "Finished computing file information of: %s files", len(file_info)) else:"Finished computing size of: %s files", len(file_info)) if with_metadata: yield file_name, ( item.size, self._updated_to_seconds(item.last_modified)) else: yield file_name, item.size logging.log( # do not spam logs when list_prefix is likely used to check empty folder logging.INFO if counter > 0 else logging.DEBUG, "Finished listing %s files in %s seconds.", counter, time.time() - start_time)
[docs] class BlobStorageDownloader(Downloader): def __init__(self, client, path, buffer_size): self._client = client self._path = path self._container, self._blob = parse_azfs_path(path) self._buffer_size = buffer_size self._blob_to_download = self._client.get_blob_client( self._container, self._blob) try: properties = self._get_object_properties() except ResourceNotFoundError as http_error: if http_error.status_code == 404: raise IOError(errno.ENOENT, 'Not found: %s' % self._path) else: _LOGGER.error( 'HTTP error while requesting file %s: %s', self._path, http_error) raise self._size = properties.size @retry.with_exponential_backoff( retry_filter=retry.retry_on_beam_io_error_filter) def _get_object_properties(self): return self._blob_to_download.get_blob_properties() @property def size(self): return self._size
[docs] def get_range(self, start, end): # Download_blob first parameter is offset and second is length (exclusive). blob_data = self._blob_to_download.download_blob(start, end - start) # Returns the content as bytes. return blob_data.readall()
[docs] class BlobStorageUploader(Uploader): def __init__(self, client, path, mime_type='application/octet-stream'): self._client = client self._path = path self._container, self._blob = parse_azfs_path(path) self._content_settings = ContentSettings(mime_type) self._blob_to_upload = self._client.get_blob_client( self._container, self._blob) # Temporary file. self._temporary_file = tempfile.NamedTemporaryFile()
[docs] def put(self, data): self._temporary_file.write(data.tobytes())
[docs] def finish(self): # The temporary file is deleted immediately after the operation. with open(, "rb") as f: self._blob_to_upload.upload_blob(, overwrite=True, content_settings=self._content_settings)