Source code for apache_beam.io.gcp.gcsfilesystem

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""GCS file system implementation for accessing files on GCS."""

from __future__ import absolute_import

from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystem import MatchResult
from apache_beam.io.gcp import gcsio

__all__ = ['GCSFileSystem']


[docs]class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://'
[docs] @classmethod def scheme(cls): """URI scheme for the FileSystem """ return 'gs'
[docs] def join(self, basepath, *paths): """Join two or more pathname components for the filesystem Args: basepath: string path of the first component of the path paths: path components to be added Returns: full path after combining all the passed components """ if not basepath.startswith(GCSFileSystem.GCS_PREFIX): raise ValueError('Basepath %r must be GCS path.', basepath) path = basepath for p in paths: path = path.rstrip('/') + '/' + p.lstrip('/') return path
[docs] def split(self, path): """Splits the given path into two parts. Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that. Head will include the GCS prefix ('gs://'). Args: path: path as a string Returns: a pair of path components as strings. """ path = path.strip() if not path.startswith(GCSFileSystem.GCS_PREFIX): raise ValueError('Path %r must be GCS path.', path) prefix_len = len(GCSFileSystem.GCS_PREFIX) last_sep = path[prefix_len:].rfind('/') if last_sep >= 0: last_sep += prefix_len if last_sep > 0: return (path[:last_sep], path[last_sep + 1:]) elif last_sep < 0: return (path, '') else: raise ValueError('Invalid path: %s', path)
[docs] def mkdirs(self, path): """Recursively create directories for the provided path. Args: path: string path of the directory structure that should be created Raises: IOError if leaf directory already exists. """ pass
[docs] def match(self, patterns, limits=None): """Find all matching paths to the pattern provided. Args: pattern: string for the file path pattern to match against limit: Maximum number of responses that need to be fetched Returns: list of ``MatchResult`` objects. Raises: ``BeamIOError`` if any of the pattern match operations fail """ if limits is None: limits = [None] * len(patterns) else: err_msg = "Patterns and limits should be equal in length" assert len(patterns) == len(limits), err_msg def _match(pattern, limit): """Find all matching paths to the pattern provided. """ if pattern.endswith('/'): pattern += '*' file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit) metadata_list = [FileMetadata(path, size) for path, size in file_sizes.iteritems()] return MatchResult(pattern, metadata_list) exceptions = {} result = [] for pattern, limit in zip(patterns, limits): try: result.append(_match(pattern, limit)) except Exception as e: # pylint: disable=broad-except exceptions[pattern] = e if exceptions: raise BeamIOError("Match operation failed", exceptions) return result
def _path_open(self, path, mode, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): """Helper functions to open a file in the provided mode. """ compression_type = FileSystem._get_compression_type(path, compression_type) mime_type = CompressionTypes.mime_type(compression_type, mime_type) raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file return CompressedFile(raw_file, compression_type=compression_type)
[docs] def create(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): """Returns a write channel for the given file path. Args: path: string path of the file object to be written to the system mime_type: MIME type to specify the type of content in the file object compression_type: Type of compression to be used for this object Returns: file handle with a close function for the user to use """ return self._path_open(path, 'wb', mime_type, compression_type)
[docs] def open(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): """Returns a read channel for the given file path. Args: path: string path of the file object to be written to the system mime_type: MIME type to specify the type of content in the file object compression_type: Type of compression to be used for this object Returns: file handle with a close function for the user to use """ return self._path_open(path, 'rb', mime_type, compression_type)
[docs] def copy(self, source_file_names, destination_file_names): """Recursively copy the file tree from the source to the destination Args: source_file_names: list of source file objects that needs to be copied destination_file_names: list of destination of the new object Raises: ``BeamIOError`` if any of the copy operations fail """ err_msg = ("source_file_names and destination_file_names should " "be equal in length") assert len(source_file_names) == len(destination_file_names), err_msg def _copy_path(source, destination): """Recursively copy the file tree from the source to the destination """ if not destination.startswith(GCSFileSystem.GCS_PREFIX): raise ValueError('Destination %r must be GCS path.', destination) # Use copy_tree if the path ends with / as it is a directory if source.endswith('/'): gcsio.GcsIO().copytree(source, destination) else: gcsio.GcsIO().copy(source, destination) exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: _copy_path(source, destination) except Exception as e: # pylint: disable=broad-except exceptions[(source, destination)] = e if exceptions: raise BeamIOError("Copy operation failed", exceptions)
[docs] def rename(self, source_file_names, destination_file_names): """Rename the files at the source list to the destination list. Source and destination lists should be of the same size. Args: source_file_names: List of file paths that need to be moved destination_file_names: List of destination_file_names for the files Raises: ``BeamIOError`` if any of the rename operations fail """ err_msg = ("source_file_names and destination_file_names should " "be equal in length") assert len(source_file_names) == len(destination_file_names), err_msg gcs_batches = [] gcs_current_batch = [] for src, dest in zip(source_file_names, destination_file_names): gcs_current_batch.append((src, dest)) if len(gcs_current_batch) == self.CHUNK_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] if gcs_current_batch: gcs_batches.append(gcs_current_batch) # Execute GCS renames if any and return exceptions. exceptions = {} for batch in gcs_batches: copy_statuses = gcsio.GcsIO().copy_batch(batch) copy_succeeded = [] for src, dest, exception in copy_statuses: if exception: exceptions[(src, dest)] = exception else: copy_succeeded.append((src, dest)) delete_batch = [src for src, dest in copy_succeeded] delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) for i, (src, exception) in enumerate(delete_statuses): dest = copy_succeeded[i][1] if exception: exceptions[(src, dest)] = exception if exceptions: raise BeamIOError("Rename operation failed", exceptions)
[docs] def exists(self, path): """Check if the provided path exists on the FileSystem. Args: path: string path that needs to be checked. Returns: boolean flag indicating if path exists """ return gcsio.GcsIO().exists(path)
[docs] def delete(self, paths): """Deletes files or directories at the provided paths. Directories will be deleted recursively. Args: paths: list of paths that give the file objects to be deleted """ def _delete_path(path): """Recursively delete the file or directory at the provided path. """ if path.endswith('/'): path_to_use = path + '*' else: path_to_use = path match_result = self.match([path_to_use])[0] statuses = gcsio.GcsIO().delete_batch( [m.path for m in match_result.metadata_list]) failures = [e for (_, e) in statuses if e is not None] if failures: raise failures[0] exceptions = {} for path in paths: try: _delete_path(path) except Exception as e: # pylint: disable=broad-except exceptions[path] = e if exceptions: raise BeamIOError("Delete operation failed", exceptions)