Source code for apache_beam.ml.transforms.utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ['ArtifactsFetcher']

import os
import tempfile
import typing

from google.cloud.storage import Client
from google.cloud.storage import transfer_manager

import tensorflow_transform as tft
from apache_beam.ml.transforms import base


def download_artifacts_from_gcs(bucket_name, prefix, local_path):
  """Downloads artifacts from GCS to the local file system.
    Args:
        bucket_name: The name of the GCS bucket to download from.
        prefix: Prefix of GCS objects to download.
        local_path: The local path to download the folder to.
  """
  client = Client()
  bucket = client.get_bucket(bucket_name)
  blobs = [blob.name for blob in bucket.list_blobs(prefix=prefix)]
  _ = transfer_manager.download_many_to_path(
      bucket, blobs, destination_directory=local_path)


[docs]class ArtifactsFetcher: """ Utility class used to fetch artifacts from the artifact_location passed to the TFTProcessHandlers in MLTransform. This is intended to be used for testing purposes only. """ def __init__(self, artifact_location: str): tempdir = tempfile.mkdtemp() if artifact_location.startswith('gs://'): parts = artifact_location[5:].split('/') bucket_name = parts[0] prefix = '/'.join(parts[1:]) download_artifacts_from_gcs(bucket_name, prefix, tempdir) assert os.listdir(tempdir), f"No files found in {artifact_location}" artifact_location = os.path.join(tempdir, prefix) files = os.listdir(artifact_location) files.remove(base._ATTRIBUTE_FILE_NAME) # TODO: https://github.com/apache/beam/issues/29356 # Integrate ArtifactFetcher into MLTransform. if len(files) > 1: raise NotImplementedError( "MLTransform may have been utilized alongside transforms written " "in TensorFlow Transform, in conjunction with those from different " "frameworks. Currently, retrieving artifacts from this " "multi-framework setup is not supported.") self._artifact_location = os.path.join(artifact_location, files[0]) self.transform_output = tft.TFTransformOutput(self._artifact_location)
[docs] def get_vocab_list(self, vocab_filename: str) -> typing.List[bytes]: """ Returns list of vocabulary terms created during MLTransform. """ try: vocab_list = self.transform_output.vocabulary_by_name(vocab_filename) except ValueError as e: raise ValueError( 'Vocabulary file {} not found in artifact location'.format( vocab_filename)) from e return [x.decode('utf-8') for x in vocab_list]
[docs] def get_vocab_filepath(self, vocab_filename: str) -> str: """ Return the path to the vocabulary file created during MLTransform. """ return self.transform_output.vocabulary_file_by_name(vocab_filename)
[docs] def get_vocab_size(self, vocab_filename: str) -> int: return self.transform_output.vocabulary_size_by_name(vocab_filename)