#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for ``FileSystem`` implementations."""
# pytype: skip-file
from __future__ import absolute_import
import abc
import io
import os
from builtins import object
from future.utils import with_metaclass
__all__ = [
'Downloader',
'Uploader',
'DownloaderStream',
'UploaderStream',
'PipeStream'
]
[docs]class Downloader(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
"""Download interface for a single file.
Implementations should support random access reads.
"""
@property
@abc.abstractmethod
def size(self):
"""Size of file to download."""
[docs] @abc.abstractmethod
def get_range(self, start, end):
"""Retrieve a given byte range [start, end) from this download.
Range must be in this form:
0 <= start < end: Fetch the bytes from start to end.
Args:
start: (int) Initial byte offset.
end: (int) Final byte offset, exclusive.
Returns:
(string) A buffer containing the requested data.
"""
[docs]class Uploader(with_metaclass(abc.ABCMeta, object)): # type: ignore[misc]
"""Upload interface for a single file."""
[docs] @abc.abstractmethod
def put(self, data):
"""Write data to file sequentially.
Args:
data: (memoryview) Data to write.
"""
[docs] @abc.abstractmethod
def finish(self):
"""Signal to upload any remaining data and close the file.
File should be fully written upon return from this method.
Raises:
Any error encountered during the upload.
"""
[docs]class DownloaderStream(io.RawIOBase):
"""Provides a stream interface for Downloader objects."""
def __init__(
self, downloader, read_buffer_size=io.DEFAULT_BUFFER_SIZE, mode='rb'):
"""Initializes the stream.
Args:
downloader: (Downloader) Filesystem dependent implementation.
read_buffer_size: (int) Buffer size to use during read operations.
mode: (string) Python mode attribute for this stream.
"""
self._downloader = downloader
self.mode = mode
self._position = 0
self._reader_buffer_size = read_buffer_size
[docs] def readinto(self, b):
"""Read up to len(b) bytes into b.
Returns number of bytes read (0 for EOF).
Args:
b: (bytearray/memoryview) Buffer to read into.
"""
self._checkClosed()
if self._position >= self._downloader.size:
return 0
start = self._position
end = min(self._position + len(b), self._downloader.size)
data = self._downloader.get_range(start, end)
self._position += len(data)
b[:len(data)] = data
return len(data)
[docs] def seek(self, offset, whence=os.SEEK_SET):
"""Set the stream's current offset.
Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
Args:
offset: seek offset as number.
whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
(seek relative to the end, offset should be negative).
Raises:
``ValueError``: When this stream is closed or if whence is invalid.
"""
self._checkClosed()
if whence == os.SEEK_SET:
self._position = offset
elif whence == os.SEEK_CUR:
self._position += offset
elif whence == os.SEEK_END:
self._position = self._downloader.size + offset
else:
raise ValueError('Whence mode %r is invalid.' % whence)
self._position = min(self._position, self._downloader.size)
self._position = max(self._position, 0)
return self._position
[docs] def tell(self):
"""Tell the stream's current offset.
Returns:
current offset in reading this stream.
Raises:
``ValueError``: When this stream is closed.
"""
self._checkClosed()
return self._position
[docs] def seekable(self):
return True
[docs] def readable(self):
return True
[docs] def readall(self):
"""Read until EOF, using multiple read() call."""
res = []
while True:
data = self.read(self._reader_buffer_size)
if not data:
break
res.append(data)
return b''.join(res)
[docs]class UploaderStream(io.RawIOBase):
"""Provides a stream interface for Uploader objects."""
def __init__(self, uploader, mode='wb'):
"""Initializes the stream.
Args:
uploader: (Uploader) Filesystem dependent implementation.
mode: (string) Python mode attribute for this stream.
"""
self._uploader = uploader
self.mode = mode
self._position = 0
[docs] def tell(self):
return self._position
[docs] def write(self, b):
"""Write bytes from b.
Returns number of bytes written (<= len(b)).
Args:
b: (memoryview) Buffer with data to write.
"""
self._checkClosed()
self._uploader.put(b)
bytes_written = len(b)
self._position += bytes_written
return bytes_written
[docs] def close(self):
"""Complete the upload and close this stream.
This method has no effect if the stream is already closed.
Raises:
Any error encountered by the uploader.
"""
if not self.closed:
self._uploader.finish()
super(UploaderStream, self).close()
[docs] def writable(self):
return True
[docs]class PipeStream(object):
"""A class that presents a pipe connection as a readable stream.
Not thread-safe.
Remembers the last ``size`` bytes read and allows rewinding the stream by that
amount exactly. See BEAM-6380 for more.
"""
def __init__(self, recv_pipe):
self.conn = recv_pipe
self.closed = False
self.position = 0
self.remaining = b''
# Data and position of last block streamed. Allows limited seeking backwards
# of stream.
self.last_block_position = None
self.last_block = b''
[docs] def read(self, size):
"""Read data from the wrapped pipe connection.
Args:
size: Number of bytes to read. Actual number of bytes read is always
equal to size unless EOF is reached.
Returns:
data read as str.
"""
data_list = []
bytes_read = 0
last_block_position = self.position
while bytes_read < size:
bytes_from_remaining = min(size - bytes_read, len(self.remaining))
data_list.append(self.remaining[0:bytes_from_remaining])
self.remaining = self.remaining[bytes_from_remaining:]
self.position += bytes_from_remaining
bytes_read += bytes_from_remaining
if not self.remaining:
try:
self.remaining = self.conn.recv_bytes()
except EOFError:
break
last_block = b''.join(data_list)
if last_block:
self.last_block_position = last_block_position
self.last_block = last_block
return last_block
[docs] def tell(self):
"""Tell the file's current offset.
Returns:
current offset in reading this file.
Raises:
``ValueError``: When this stream is closed.
"""
self._check_open()
return self.position
[docs] def seek(self, offset, whence=os.SEEK_SET):
# The apitools library used by the gcsio.Uploader class insists on seeking
# to the end of a stream to do a check before completing an upload, so we
# must have this no-op method here in that case.
if whence == os.SEEK_END and offset == 0:
return
elif whence == os.SEEK_SET:
if offset == self.position:
return
elif offset == self.last_block_position and self.last_block:
self.position = offset
self.remaining = b''.join([self.last_block, self.remaining])
self.last_block = b''
return
raise NotImplementedError(
'offset: %s, whence: %s, position: %s, last: %s' %
(offset, whence, self.position, self.last_block_position))
def _check_open(self):
if self.closed:
raise IOError('Stream is closed.')