Source code for apache_beam.io.fileio

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""``PTransforms`` for manipulating files in Apache Beam.

Provides reading ``PTransform``\\s, ``MatchFiles``,
``MatchAll``, that produces a ``PCollection`` of records representing a file
and its metadata; and ``ReadMatches``, which takes in a ``PCollection`` of file
metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects.
These transforms currently do not support splitting by themselves.

No backward compatibility guarantees. Everything in this module is experimental.
"""

from __future__ import absolute_import

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io.filesystem import BeamIOError
from apache_beam.utils.annotations import experimental

__all__ = ['EmptyMatchTreatment',
           'MatchFiles',
           'MatchAll',
           'ReadableFile',
           'ReadMatches']


[docs]class EmptyMatchTreatment(object): """How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms. If empty matches are disallowed, an error will be thrown if a pattern does not match any files.""" ALLOW = 'ALLOW' DISALLOW = 'DISALLOW' ALLOW_IF_WILDCARD = 'ALLOW_IF_WILDCARD'
[docs] @staticmethod def allow_empty_match(pattern, setting): if setting == EmptyMatchTreatment.ALLOW: return True elif setting == EmptyMatchTreatment.ALLOW_IF_WILDCARD and '*' in pattern: return True elif setting == EmptyMatchTreatment.DISALLOW: return False else: raise ValueError(setting)
class _MatchAllFn(beam.DoFn): def __init__(self, empty_match_treatment): self._empty_match_treatment = empty_match_treatment def process(self, file_pattern): # TODO: Should we batch the lookups? match_results = filesystems.FileSystems.match([file_pattern]) match_result = match_results[0] if (not match_result.metadata_list and not EmptyMatchTreatment.allow_empty_match( file_pattern, self._empty_match_treatment)): raise BeamIOError( 'Empty match for pattern %s. Disallowed.' % file_pattern) return match_result.metadata_list
[docs]@experimental() class MatchFiles(beam.PTransform): """Matches a file pattern using ``FileSystems.match``. This ``PTransform`` returns a ``PCollection`` of matching files in the form of ``FileMetadata`` objects.""" def __init__(self, file_pattern, empty_match_treatment=EmptyMatchTreatment.ALLOW_IF_WILDCARD): self._file_pattern = file_pattern self._empty_match_treatment = empty_match_treatment
[docs] def expand(self, pcoll): return (pcoll.pipeline | beam.Create([self._file_pattern]) | MatchAll())
[docs]@experimental() class MatchAll(beam.PTransform): """Matches file patterns from the input PCollection via ``FileSystems.match``. This ``PTransform`` returns a ``PCollection`` of matching files in the form of ``FileMetadata`` objects.""" def __init__(self, empty_match_treatment=EmptyMatchTreatment.ALLOW): self._empty_match_treatment = empty_match_treatment
[docs] def expand(self, pcoll): return (pcoll | beam.ParDo(_MatchAllFn(self._empty_match_treatment)))
class _ReadMatchesFn(beam.DoFn): def __init__(self, compression, skip_directories): self._compression = compression self._skip_directories = skip_directories def process(self, file_metadata): metadata = (filesystem.FileMetadata(file_metadata, 0) if isinstance(file_metadata, (str, unicode)) else file_metadata) if ((metadata.path.endswith('/') or metadata.path.endswith('\\')) and self._skip_directories): return elif metadata.path.endswith('/') or metadata.path.endswith('\\'): raise BeamIOError( 'Directories are not allowed in ReadMatches transform.' 'Found %s.' % metadata.path) # TODO: Mime type? Other arguments? Maybe arguments passed in to transform? yield ReadableFile(metadata)
[docs]class ReadableFile(object): """A utility class for accessing files.""" def __init__(self, metadata): self.metadata = metadata
[docs] def open(self, mime_type='text/plain'): return filesystems.FileSystems.open(self.metadata.path)
[docs] def read(self): return self.open().read()
[docs] def read_utf8(self): return self.open().read().decode('utf-8')
[docs]@experimental() class ReadMatches(beam.PTransform): """Converts each result of MatchFiles() or MatchAll() to a ReadableFile. This helps read in a file's contents or obtain a file descriptor.""" def __init__(self, compression=None, skip_directories=True): self._compression = compression self._skip_directories = skip_directories
[docs] def expand(self, pcoll): return pcoll | beam.ParDo(_ReadMatchesFn(self._compression, self._skip_directories))