#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""FileSystems interface class for accessing the correct filesystem"""
import re
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileSystem
# All filesystem implements should be added here
# pylint: disable=wrong-import-position, unused-import
from apache_beam.io.hadoopfilesystem import HadoopFileSystem
from apache_beam.io.localfilesystem import LocalFileSystem
try:
  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
except ImportError:
  pass
# pylint: enable=wrong-import-position, unused-import
__all__ = ['FileSystems']
[docs]class FileSystems(object):
  """A class that defines the functions that can be performed on a filesystem.
  All methods are static and access the underlying registered filesystems.
  """
  URI_SCHEMA_PATTERN = re.compile('(?P<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*')
  _pipeline_options = None
[docs]  @classmethod
  def set_options(cls, pipeline_options):
    """Set filesystem options.
    Args:
      pipeline_options: Instance of ``PipelineOptions``.
    """
    cls._options = pipeline_options 
[docs]  @staticmethod
  def get_scheme(path):
    match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip())
    if match_result is None:
      return None
    return match_result.groupdict()['scheme'] 
[docs]  @staticmethod
  def get_filesystem(path):
    """Get the correct filesystem for the specified path
    """
    try:
      path_scheme = FileSystems.get_scheme(path)
      systems = [fs for fs in FileSystem.get_all_subclasses()
                 if fs.scheme() == path_scheme]
      if len(systems) == 0:
        raise ValueError('Unable to get the Filesystem for path %s' % path)
      elif len(systems) == 1:
        return systems[0](pipeline_options=FileSystems._pipeline_options)
      else:
        raise ValueError('Found more than one filesystem for path %s' % path)
    except ValueError:
      raise
    except Exception as e:
      raise BeamIOError('Unable to get the Filesystem', {path: e}) 
[docs]  @staticmethod
  def join(basepath, *paths):
    """Join two or more pathname components for the filesystem
    Args:
      basepath: string path of the first component of the path
      paths: path components to be added
    Returns: full path after combining all the passed components
    """
    filesystem = FileSystems.get_filesystem(basepath)
    return filesystem.join(basepath, *paths) 
[docs]  @staticmethod
  def split(path):
    """Splits the given path into two parts.
    Splits the path into a pair (head, tail) such that tail contains the last
    component of the path and head contains everything up to that.
    For file-systems other than the local file-system, head should include the
    prefix.
    Args:
      path: path as a string
    Returns:
      a pair of path components as strings.
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.split(path) 
[docs]  @staticmethod
  def mkdirs(path):
    """Recursively create directories for the provided path.
    Args:
      path: string path of the directory structure that should be created
    Raises:
      IOError if leaf directory already exists.
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.mkdirs(path) 
[docs]  @staticmethod
  def match(patterns, limits=None):
    """Find all matching paths to the patterns provided.
    Args:
      patterns: list of string for the file path pattern to match against
      limits: list of maximum number of responses that need to be fetched
    Returns: list of ``MatchResult`` objects.
    Raises:
      ``BeamIOError`` if any of the pattern match operations fail
    """
    if len(patterns) == 0:
      return []
    filesystem = FileSystems.get_filesystem(patterns[0])
    return filesystem.match(patterns, limits) 
[docs]  @staticmethod
  def create(path, mime_type='application/octet-stream',
             compression_type=CompressionTypes.AUTO):
    """Returns a write channel for the given file path.
    Args:
      path: string path of the file object to be written to the system
      mime_type: MIME type to specify the type of content in the file object
      compression_type: Type of compression to be used for this object. See
        ``CompressionTypes`` for possible values.
    Returns: file handle with a ``close`` function for the user to use.
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.create(path, mime_type, compression_type) 
[docs]  @staticmethod
  def open(path, mime_type='application/octet-stream',
           compression_type=CompressionTypes.AUTO):
    """Returns a read channel for the given file path.
    Args:
      path: string path of the file object to be written to the system
      mime_type: MIME type to specify the type of content in the file object
      compression_type: Type of compression to be used for this object. See
        ``CompressionTypes`` for possible values.
    Returns: file handle with a ``close`` function for the user to use.
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.open(path, mime_type, compression_type) 
[docs]  @staticmethod
  def copy(source_file_names, destination_file_names):
    """Recursively copy the file list from the source to the destination
    Args:
      source_file_names: list of source file objects that needs to be copied
      destination_file_names: list of destination of the new object
    Raises:
      ``BeamIOError`` if any of the copy operations fail
    """
    if len(source_file_names) == 0:
      return
    filesystem = FileSystems.get_filesystem(source_file_names[0])
    return filesystem.copy(source_file_names, destination_file_names) 
[docs]  @staticmethod
  def rename(source_file_names, destination_file_names):
    """Rename the files at the source list to the destination list.
    Source and destination lists should be of the same size.
    Args:
      source_file_names: List of file paths that need to be moved
      destination_file_names: List of destination_file_names for the files
    Raises:
      ``BeamIOError`` if any of the rename operations fail
    """
    if len(source_file_names) == 0:
      return
    filesystem = FileSystems.get_filesystem(source_file_names[0])
    return filesystem.rename(source_file_names, destination_file_names) 
[docs]  @staticmethod
  def exists(path):
    """Check if the provided path exists on the FileSystem.
    Args:
      path: string path that needs to be checked.
    Returns: boolean flag indicating if path exists
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.exists(path) 
[docs]  @staticmethod
  def delete(paths):
    """Deletes files or directories at the provided paths.
    Directories will be deleted recursively.
    Args:
      paths: list of paths that give the file objects to be deleted
    Raises:
      ``BeamIOError`` if any of the delete operations fail
    """
    if len(paths) == 0:
      return
    filesystem = FileSystems.get_filesystem(paths[0])
    return filesystem.delete(paths) 
[docs]  @staticmethod
  def get_chunk_size(path):
    """Get the correct chunk size for the FileSystem.
    Args:
      path: string path that needs to be checked.
    Returns: integer size for parallelization in the FS operations.
    """
    filesystem = FileSystems.get_filesystem(path)
    return filesystem.CHUNK_SIZE