Source code for apache_beam.io.filesystemio

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for ``FileSystem`` implementations."""

from __future__ import absolute_import

import abc
import io
import os
from builtins import object

from future.utils import with_metaclass

__all__ = ['Downloader', 'Uploader', 'DownloaderStream', 'UploaderStream',
           'PipeStream']


[docs]class Downloader(with_metaclass(abc.ABCMeta, object)): """Download interface for a single file. Implementations should support random access reads. """ @abc.abstractproperty def size(self): """Size of file to download."""
[docs] @abc.abstractmethod def get_range(self, start, end): """Retrieve a given byte range [start, end) from this download. Range must be in this form: 0 <= start < end: Fetch the bytes from start to end. Args: start: (int) Initial byte offset. end: (int) Final byte offset, exclusive. Returns: (string) A buffer containing the requested data. """
[docs]class Uploader(with_metaclass(abc.ABCMeta, object)): """Upload interface for a single file."""
[docs] @abc.abstractmethod def put(self, data): """Write data to file sequentially. Args: data: (memoryview) Data to write. """
[docs] @abc.abstractmethod def finish(self): """Signal to upload any remaining data and close the file. File should be fully written upon return from this method. Raises: Any error encountered during the upload. """
[docs]class DownloaderStream(io.RawIOBase): """Provides a stream interface for Downloader objects.""" def __init__(self, downloader, read_buffer_size=io.DEFAULT_BUFFER_SIZE, mode='rb'): """Initializes the stream. Args: downloader: (Downloader) Filesystem dependent implementation. read_buffer_size: (int) Buffer size to use during read operations. mode: (string) Python mode attribute for this stream. """ self._downloader = downloader self.mode = mode self._position = 0 self._reader_buffer_size = read_buffer_size
[docs] def readinto(self, b): """Read up to len(b) bytes into b. Returns number of bytes read (0 for EOF). Args: b: (bytearray/memoryview) Buffer to read into. """ self._checkClosed() if self._position >= self._downloader.size: return 0 start = self._position end = min(self._position + len(b), self._downloader.size) data = self._downloader.get_range(start, end) self._position += len(data) b[:len(data)] = data return len(data)
[docs] def seek(self, offset, whence=os.SEEK_SET): """Set the stream's current offset. Note if the new offset is out of bound, it is adjusted to either 0 or EOF. Args: offset: seek offset as number. whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), os.SEEK_CUR (seek relative to the current position), and os.SEEK_END (seek relative to the end, offset should be negative). Raises: ``ValueError``: When this stream is closed or if whence is invalid. """ self._checkClosed() if whence == os.SEEK_SET: self._position = offset elif whence == os.SEEK_CUR: self._position += offset elif whence == os.SEEK_END: self._position = self._downloader.size + offset else: raise ValueError('Whence mode %r is invalid.' % whence) self._position = min(self._position, self._downloader.size) self._position = max(self._position, 0) return self._position
[docs] def tell(self): """Tell the stream's current offset. Returns: current offset in reading this stream. Raises: ``ValueError``: When this stream is closed. """ self._checkClosed() return self._position
[docs] def seekable(self): return True
[docs] def readable(self): return True
[docs] def readall(self): """Read until EOF, using multiple read() call.""" res = [] while True: data = self.read(self._reader_buffer_size) if not data: break res.append(data) return b''.join(res)
[docs]class UploaderStream(io.RawIOBase): """Provides a stream interface for Uploader objects.""" def __init__(self, uploader, mode='wb'): """Initializes the stream. Args: uploader: (Uploader) Filesystem dependent implementation. mode: (string) Python mode attribute for this stream. """ self._uploader = uploader self.mode = mode self._position = 0
[docs] def tell(self): return self._position
[docs] def write(self, b): """Write bytes from b. Returns number of bytes written (<= len(b)). Args: b: (memoryview) Buffer with data to write. """ self._checkClosed() self._uploader.put(b) bytes_written = len(b) self._position += bytes_written return bytes_written
[docs] def close(self): """Complete the upload and close this stream. This method has no effect if the stream is already closed. Raises: Any error encountered by the uploader. """ if not self.closed: self._uploader.finish() super(UploaderStream, self).close()
[docs] def writable(self): return True
[docs]class PipeStream(object): """A class that presents a pipe connection as a readable stream. Not thread-safe. Remembers the last ``size`` bytes read and allows rewinding the stream by that amount exactly. See BEAM-6380 for more. """ def __init__(self, recv_pipe): self.conn = recv_pipe self.closed = False self.position = 0 self.remaining = b'' # Data and position of last block streamed. Allows limited seeking backwards # of stream. self.last_block_position = None self.last_block = b''
[docs] def read(self, size): """Read data from the wrapped pipe connection. Args: size: Number of bytes to read. Actual number of bytes read is always equal to size unless EOF is reached. Returns: data read as str. """ data_list = [] bytes_read = 0 self.last_block_position = self.position while bytes_read < size: bytes_from_remaining = min(size - bytes_read, len(self.remaining)) data_list.append(self.remaining[0:bytes_from_remaining]) self.remaining = self.remaining[bytes_from_remaining:] self.position += bytes_from_remaining bytes_read += bytes_from_remaining if not self.remaining: try: self.remaining = self.conn.recv_bytes() except EOFError: break self.last_block = b''.join(data_list) return self.last_block
[docs] def tell(self): """Tell the file's current offset. Returns: current offset in reading this file. Raises: ``ValueError``: When this stream is closed. """ self._check_open() return self.position
[docs] def seek(self, offset, whence=os.SEEK_SET): # The apitools library used by the gcsio.Uploader class insists on seeking # to the end of a stream to do a check before completing an upload, so we # must have this no-op method here in that case. if whence == os.SEEK_END and offset == 0: return elif whence == os.SEEK_SET: if offset == self.position: return elif offset == self.last_block_position and self.last_block: self.position = offset self.remaining = b''.join([self.last_block, self.remaining]) self.last_block = b'' return raise NotImplementedError( 'offset: %s, whence: %s, position: %s, last: %s' % ( offset, whence, self.position, self.last_block_position))
def _check_open(self): if self.closed: raise IOError('Stream is closed.')