#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""":class:`~apache_beam.io.filesystem.FileSystem` implementation for accessing
Hadoop Distributed File System files."""
from __future__ import absolute_import
import logging
import posixpath
import re
from hdfs3 import HDFileSystem
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystem import MatchResult
__all__ = ['HadoopFileSystem']
_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_COPY_BUFFER_SIZE = 2 ** 16
# TODO(udim): Add @retry.with_exponential_backoff to some functions, like in
# gcsio.py.
[docs]class HadoopFileSystem(FileSystem):
  """``FileSystem`` implementation that supports HDFS.
  URL arguments to methods expect strings starting with ``hdfs://``.
  Uses client library :class:`hdfs3.core.HDFileSystem`.
  """
  def __init__(self, pipeline_options):
    """Initializes a connection to HDFS.
    Connection configuration is done using :doc:`hdfs`.
    """
    super(HadoopFileSystem, self).__init__(pipeline_options)
    self._hdfs_client = HDFileSystem()
[docs]  @classmethod
  def scheme(cls):
    return 'hdfs' 
  @staticmethod
  def _parse_url(url):
    """Verifies that url begins with hdfs:// prefix, strips it and adds a
    leading /.
    Raises:
      ValueError if url doesn't begin with hdfs://.
    Args:
      url: A URL in the form hdfs://path/...
    Returns:
      For an input of 'hdfs://path/...', will return '/path/...'.
    """
    m = _URL_RE.match(url)
    if m is None:
      raise ValueError('Could not parse url: %s' % url)
    return m.group(1)
[docs]  def join(self, base_url, *paths):
    """Join two or more pathname components.
    Args:
      base_url: string path of the first component of the path.
        Must start with hdfs://.
      paths: path components to be added
    Returns:
      Full url after combining all the passed components.
    """
    basepath = self._parse_url(base_url)
    return _HDFS_PREFIX + self._join(basepath, *paths) 
  def _join(self, basepath, *paths):
    return posixpath.join(basepath, *paths)
[docs]  def split(self, url):
    rel_path = self._parse_url(url)
    head, tail = posixpath.split(rel_path)
    return _HDFS_PREFIX + head, tail 
[docs]  def mkdirs(self, url):
    path = self._parse_url(url)
    if self._exists(path):
      raise IOError('Path already exists: %s' % path)
    return self._mkdirs(path) 
  def _mkdirs(self, path):
    self._hdfs_client.makedirs(path)
[docs]  def match(self, url_patterns, limits=None):
    if limits is None:
      limits = [None] * len(url_patterns)
    if len(url_patterns) != len(limits):
      raise BeamIOError(
          'Patterns and limits should be equal in length: %d != %d' % (
              len(url_patterns), len(limits)))
    # TODO(udim): Update client to allow batched results.
    def _match(path_pattern, limit):
      """Find all matching paths to the pattern provided."""
      file_infos = self._hdfs_client.ls(path_pattern, detail=True)[:limit]
      metadata_list = [FileMetadata(file_info['name'], file_info['size'])
                       for file_info in file_infos]
      return MatchResult(path_pattern, metadata_list)
    exceptions = {}
    result = []
    for url_pattern, limit in zip(url_patterns, limits):
      try:
        path_pattern = self._parse_url(url_pattern)
        result.append(_match(path_pattern, limit))
      except Exception as e:  # pylint: disable=broad-except
        exceptions[url_pattern] = e
    if exceptions:
      raise BeamIOError('Match operation failed', exceptions)
    return result 
  def _open_hdfs(self, path, mode, mime_type, compression_type):
    if mime_type != 'application/octet-stream':
      logging.warning('Mime types are not supported. Got non-default mime_type:'
                      ' %s', mime_type)
    if compression_type == CompressionTypes.AUTO:
      compression_type = CompressionTypes.detect_compression_type(path)
    res = self._hdfs_client.open(path, mode)
    if compression_type != CompressionTypes.UNCOMPRESSED:
      res = CompressedFile(res)
    return res
[docs]  def create(self, url, mime_type='application/octet-stream',
             compression_type=CompressionTypes.AUTO):
    """
    Returns:
      *hdfs3.core.HDFile*: An Python File-like object.
    """
    path = self._parse_url(url)
    return self._create(path, mime_type, compression_type) 
  def _create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):
    return self._open_hdfs(path, 'wb', mime_type, compression_type)
[docs]  def open(self, url, mime_type='application/octet-stream',
           compression_type=CompressionTypes.AUTO):
    """
    Returns:
      *hdfs3.core.HDFile*: An Python File-like object.
    """
    path = self._parse_url(url)
    return self._open(path, mime_type, compression_type) 
  def _open(self, path, mime_type='application/octet-stream',
            compression_type=CompressionTypes.AUTO):
    return self._open_hdfs(path, 'rb', mime_type, compression_type)
[docs]  def copy(self, source_file_names, destination_file_names):
    """
    Will overwrite files and directories in destination_file_names.
    Raises ``BeamIOError`` if any error occurred.
    Args:
      source_file_names: iterable of URLs.
      destination_file_names: iterable of URLs.
    """
    if len(source_file_names) != len(destination_file_names):
      raise BeamIOError(
          'source_file_names and destination_file_names should '
          'be equal in length: %d != %d' % (
              len(source_file_names), len(destination_file_names)))
    def _copy_file(source, destination):
      with self._open(source) as f1:
        with self._create(destination) as f2:
          while True:
            buf = f1.read(_COPY_BUFFER_SIZE)
            if not buf:
              break
            f2.write(buf)
    def _copy_path(source, destination):
      """Recursively copy the file tree from the source to the destination."""
      if not self._hdfs_client.isdir(source):
        _copy_file(source, destination)
        return
      for path, dirs, files in self._hdfs_client.walk(source):
        for dir in dirs:
          new_dir = self._join(destination, dir)
          if not self._exists(new_dir):
            self._mkdirs(new_dir)
        rel_path = posixpath.relpath(path, source)
        if rel_path == '.':
          rel_path = ''
        for file in files:
          _copy_file(self._join(path, file),
                     self._join(destination, rel_path, file))
    exceptions = {}
    for source, destination in zip(source_file_names, destination_file_names):
      try:
        rel_source = self._parse_url(source)
        rel_destination = self._parse_url(destination)
        _copy_path(rel_source, rel_destination)
      except Exception as e:  # pylint: disable=broad-except
        exceptions[(source, destination)] = e
    if exceptions:
      raise BeamIOError('Copy operation failed', exceptions) 
[docs]  def rename(self, source_file_names, destination_file_names):
    exceptions = {}
    for source, destination in zip(source_file_names, destination_file_names):
      try:
        rel_source = self._parse_url(source)
        rel_destination = self._parse_url(destination)
        if not self._hdfs_client.mv(rel_source, rel_destination):
          raise BeamIOError(
              'libhdfs error in renaming %s to %s' % (source, destination))
      except Exception as e:  # pylint: disable=broad-except
        exceptions[(source, destination)] = e
    if exceptions:
      raise BeamIOError('Rename operation failed', exceptions) 
[docs]  def exists(self, url):
    """Checks existence of url in HDFS.
    Args:
      url: String in the form hdfs://...
    Returns:
      True if url exists as a file or directory in HDFS.
    """
    path = self._parse_url(url)
    return self._exists(path) 
  def _exists(self, path):
    """Returns True if path exists as a file or directory in HDFS.
    Args:
      path: String in the form /...
    """
    return self._hdfs_client.exists(path)
[docs]  def delete(self, urls):
    exceptions = {}
    for url in urls:
      try:
        path = self._parse_url(url)
        self._hdfs_client.rm(path, recursive=True)
      except Exception as e:  # pylint: disable=broad-except
        exceptions[url] = e
    if exceptions:
      raise BeamIOError("Delete operation failed", exceptions)