Source code for apache_beam.io.hadoopfilesystem

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""":class:`~apache_beam.io.filesystem.FileSystem` implementation for accessing
Hadoop Distributed File System files."""

from __future__ import absolute_import

import logging
import posixpath
import re

from hdfs3 import HDFileSystem

from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.filesystem import MatchResult

__all__ = ['HadoopFileSystem']

_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_COPY_BUFFER_SIZE = 2 ** 16


# TODO(udim): Add @retry.with_exponential_backoff to some functions, like in
# gcsio.py.


[docs]class HadoopFileSystem(FileSystem): """``FileSystem`` implementation that supports HDFS. URL arguments to methods expect strings starting with ``hdfs://``. Uses client library :class:`hdfs3.core.HDFileSystem`. """ def __init__(self, pipeline_options): """Initializes a connection to HDFS. Connection configuration is done using :doc:`hdfs`. """ super(HadoopFileSystem, self).__init__(pipeline_options) self._hdfs_client = HDFileSystem()
[docs] @classmethod def scheme(cls): return 'hdfs'
@staticmethod def _parse_url(url): """Verifies that url begins with hdfs:// prefix, strips it and adds a leading /. Raises: ValueError if url doesn't begin with hdfs://. Args: url: A URL in the form hdfs://path/... Returns: For an input of 'hdfs://path/...', will return '/path/...'. """ m = _URL_RE.match(url) if m is None: raise ValueError('Could not parse url: %s' % url) return m.group(1)
[docs] def join(self, base_url, *paths): """Join two or more pathname components. Args: base_url: string path of the first component of the path. Must start with hdfs://. paths: path components to be added Returns: Full url after combining all the passed components. """ basepath = self._parse_url(base_url) return _HDFS_PREFIX + self._join(basepath, *paths)
def _join(self, basepath, *paths): return posixpath.join(basepath, *paths)
[docs] def split(self, url): rel_path = self._parse_url(url) head, tail = posixpath.split(rel_path) return _HDFS_PREFIX + head, tail
[docs] def mkdirs(self, url): path = self._parse_url(url) if self._exists(path): raise IOError('Path already exists: %s' % path) return self._mkdirs(path)
def _mkdirs(self, path): self._hdfs_client.makedirs(path)
[docs] def match(self, url_patterns, limits=None): if limits is None: limits = [None] * len(url_patterns) if len(url_patterns) != len(limits): raise BeamIOError( 'Patterns and limits should be equal in length: %d != %d' % ( len(url_patterns), len(limits))) # TODO(udim): Update client to allow batched results. def _match(path_pattern, limit): """Find all matching paths to the pattern provided.""" file_infos = self._hdfs_client.ls(path_pattern, detail=True)[:limit] metadata_list = [FileMetadata(file_info['name'], file_info['size']) for file_info in file_infos] return MatchResult(path_pattern, metadata_list) exceptions = {} result = [] for url_pattern, limit in zip(url_patterns, limits): try: path_pattern = self._parse_url(url_pattern) result.append(_match(path_pattern, limit)) except Exception as e: # pylint: disable=broad-except exceptions[url_pattern] = e if exceptions: raise BeamIOError('Match operation failed', exceptions) return result
def _open_hdfs(self, path, mode, mime_type, compression_type): if mime_type != 'application/octet-stream': logging.warning('Mime types are not supported. Got non-default mime_type:' ' %s', mime_type) if compression_type == CompressionTypes.AUTO: compression_type = CompressionTypes.detect_compression_type(path) res = self._hdfs_client.open(path, mode) if compression_type != CompressionTypes.UNCOMPRESSED: res = CompressedFile(res) return res
[docs] def create(self, url, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): """ Returns: *hdfs3.core.HDFile*: An Python File-like object. """ path = self._parse_url(url) return self._create(path, mime_type, compression_type)
def _create(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): return self._open_hdfs(path, 'wb', mime_type, compression_type)
[docs] def open(self, url, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): """ Returns: *hdfs3.core.HDFile*: An Python File-like object. """ path = self._parse_url(url) return self._open(path, mime_type, compression_type)
def _open(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): return self._open_hdfs(path, 'rb', mime_type, compression_type)
[docs] def copy(self, source_file_names, destination_file_names): """ Will overwrite files and directories in destination_file_names. Raises ``BeamIOError`` if any error occurred. Args: source_file_names: iterable of URLs. destination_file_names: iterable of URLs. """ if len(source_file_names) != len(destination_file_names): raise BeamIOError( 'source_file_names and destination_file_names should ' 'be equal in length: %d != %d' % ( len(source_file_names), len(destination_file_names))) def _copy_file(source, destination): with self._open(source) as f1: with self._create(destination) as f2: while True: buf = f1.read(_COPY_BUFFER_SIZE) if not buf: break f2.write(buf) def _copy_path(source, destination): """Recursively copy the file tree from the source to the destination.""" if not self._hdfs_client.isdir(source): _copy_file(source, destination) return for path, dirs, files in self._hdfs_client.walk(source): for dir in dirs: new_dir = self._join(destination, dir) if not self._exists(new_dir): self._mkdirs(new_dir) rel_path = posixpath.relpath(path, source) if rel_path == '.': rel_path = '' for file in files: _copy_file(self._join(path, file), self._join(destination, rel_path, file)) exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: rel_source = self._parse_url(source) rel_destination = self._parse_url(destination) _copy_path(rel_source, rel_destination) except Exception as e: # pylint: disable=broad-except exceptions[(source, destination)] = e if exceptions: raise BeamIOError('Copy operation failed', exceptions)
[docs] def rename(self, source_file_names, destination_file_names): exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: rel_source = self._parse_url(source) rel_destination = self._parse_url(destination) if not self._hdfs_client.mv(rel_source, rel_destination): raise BeamIOError( 'libhdfs error in renaming %s to %s' % (source, destination)) except Exception as e: # pylint: disable=broad-except exceptions[(source, destination)] = e if exceptions: raise BeamIOError('Rename operation failed', exceptions)
[docs] def exists(self, url): """Checks existence of url in HDFS. Args: url: String in the form hdfs://... Returns: True if url exists as a file or directory in HDFS. """ path = self._parse_url(url) return self._exists(path)
def _exists(self, path): """Returns True if path exists as a file or directory in HDFS. Args: path: String in the form /... """ return self._hdfs_client.exists(path)
[docs] def delete(self, urls): exceptions = {} for url in urls: try: path = self._parse_url(url) self._hdfs_client.rm(path, recursive=True) except Exception as e: # pylint: disable=broad-except exceptions[url] = e if exceptions: raise BeamIOError("Delete operation failed", exceptions)