Source code for apache_beam.ml.inference.huggingface_inference

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

import logging
import sys
from collections import defaultdict
from enum import Enum
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Sequence
from typing import Union

import tensorflow as tf
import torch
from apache_beam.ml.inference import utils
from apache_beam.ml.inference.base import ModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.pytorch_inference import _convert_to_device
from transformers import AutoModel
from transformers import Pipeline
from transformers import TFAutoModel
from transformers import pipeline

_LOGGER = logging.getLogger(__name__)

__all__ = [
    "HuggingFaceModelHandlerTensor",
    "HuggingFaceModelHandlerKeyedTensor",
    "HuggingFacePipelineModelHandler",
]

TensorInferenceFn = Callable[[
    Sequence[Union[torch.Tensor, tf.Tensor]],
    Union[AutoModel, TFAutoModel],
    str,
    Optional[Dict[str, Any]],
    Optional[str],
],
                             Iterable[PredictionResult],
                             ]

KeyedTensorInferenceFn = Callable[[
    Sequence[Dict[str, Union[torch.Tensor, tf.Tensor]]],
    Union[AutoModel, TFAutoModel],
    str,
    Optional[Dict[str, Any]],
    Optional[str]
],
                                  Iterable[PredictionResult]]

PipelineInferenceFn = Callable[
    [Sequence[str], Pipeline, Optional[Dict[str, Any]]],
    Iterable[PredictionResult]]


class PipelineTask(str, Enum):
  """
  PipelineTask defines all the tasks supported by the Hugging Face Pipelines
  listed at https://huggingface.co/docs/transformers/main_classes/pipelines.
  Only these tasks can be passed to HuggingFacePipelineModelHandler.
  """
  AudioClassification = 'audio-classification'
  AutomaticSpeechRecognition = 'automatic-speech-recognition'
  Conversational = 'conversational'
  DepthEstimation = 'depth-estimation'
  DocumentQuestionAnswering = 'document-question-answering'
  FeatureExtraction = 'feature-extraction'
  FillMask = 'fill-mask'
  ImageClassification = 'image-classification'
  ImageSegmentation = 'image-segmentation'
  ImageToText = 'image-to-text'
  MaskGeneration = 'mask-generation'
  NER = 'ner'
  ObjectDetection = 'object-detection'
  QuestionAnswering = 'question-answering'
  SentimentAnalysis = 'sentiment-analysis'
  Summarization = 'summarization'
  TableQuestionAnswering = 'table-question-answering'
  TextClassification = 'text-classification'
  TextGeneration = 'text-generation'
  Text2TextGeneration = 'text2text-generation'
  TextToAudio = 'text-to-audio'
  TokenClassification = 'token-classification'
  Translation = 'translation'
  VideoClassification = 'video-classification'
  VisualQuestionAnswering = 'visual-question-answering'
  VQA = 'vqa'
  ZeroShotAudioClassification = 'zero-shot-audio-classification'
  ZeroShotClassification = 'zero-shot-classification'
  ZeroShotImageClassification = 'zero-shot-image-classification'
  ZeroShotObjectDetection = 'zero-shot-object-detection'
  Translation_XX_to_YY = 'translation_XX_to_YY'


def _validate_constructor_args(model_uri, model_class):
  message = (
      "Please provide both model class and model uri to load the model."
      "Got params as model_uri={model_uri} and "
      "model_class={model_class}.")
  if not model_uri and not model_class:
    raise RuntimeError(
        message.format(model_uri=model_uri, model_class=model_class))
  elif not model_uri:
    raise RuntimeError(
        message.format(model_uri=model_uri, model_class=model_class))
  elif not model_class:
    raise RuntimeError(
        message.format(model_uri=model_uri, model_class=model_class))


def no_gpu_available_warning():
  _LOGGER.warning(
      "HuggingFaceModelHandler specified a 'GPU' device, "
      "but GPUs are not available. Switching to CPU.")


def is_gpu_available_torch():
  if torch.cuda.is_available():
    return True
  else:
    no_gpu_available_warning()
    return False


def get_device_torch(device):
  if device == "GPU" and is_gpu_available_torch():
    return torch.device("cuda")
  return torch.device("cpu")


def is_gpu_available_tensorflow(device):
  gpu_devices = tf.config.list_physical_devices(device)
  if len(gpu_devices) == 0:
    no_gpu_available_warning()
    return False
  return True


def _validate_constructor_args_hf_pipeline(task, model):
  if not task and not model:
    raise RuntimeError(
        'Please provide either task or model to the '
        'HuggingFacePipelineModelHandler. If the model already defines the '
        'task, no need to specify the task.')


def _run_inference_torch_keyed_tensor(
    batch: Sequence[Dict[str, torch.Tensor]],
    model: AutoModel,
    device,
    inference_args: Dict[str, Any],
    model_id: Optional[str] = None) -> Iterable[PredictionResult]:
  device = get_device_torch(device)
  key_to_tensor_list = defaultdict(list)
  # torch.no_grad() mitigates GPU memory issues
  # https://github.com/apache/beam/issues/22811
  with torch.no_grad():
    for example in batch:
      for key, tensor in example.items():
        key_to_tensor_list[key].append(tensor)
    key_to_batched_tensors = {}
    for key in key_to_tensor_list:
      batched_tensors = torch.stack(key_to_tensor_list[key])
      batched_tensors = _convert_to_device(batched_tensors, device)
      key_to_batched_tensors[key] = batched_tensors
    predictions = model(**key_to_batched_tensors, **inference_args)
    return utils._convert_to_result(batch, predictions, model_id)


def _run_inference_tensorflow_keyed_tensor(
    batch: Sequence[Dict[str, tf.Tensor]],
    model: TFAutoModel,
    device,
    inference_args: Dict[str, Any],
    model_id: Optional[str] = None) -> Iterable[PredictionResult]:
  if device == "GPU":
    is_gpu_available_tensorflow(device)
  key_to_tensor_list = defaultdict(list)
  for example in batch:
    for key, tensor in example.items():
      key_to_tensor_list[key].append(tensor)
  key_to_batched_tensors = {}
  for key in key_to_tensor_list:
    batched_tensors = tf.stack(key_to_tensor_list[key], axis=0)
    key_to_batched_tensors[key] = batched_tensors
  predictions = model(**key_to_batched_tensors, **inference_args)
  return utils._convert_to_result(batch, predictions, model_id)


[docs]class HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str, Union[tf.Tensor, torch.Tensor]], PredictionResult, Union[AutoModel, TFAutoModel]]): def __init__( self, model_uri: str, model_class: Union[AutoModel, TFAutoModel], framework: str, device: str = "CPU", *, inference_fn: Optional[Callable[..., Iterable[PredictionResult]]] = None, load_model_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, **kwargs): """ Implementation of the ModelHandler interface for HuggingFace with Keyed Tensors for PyTorch/Tensorflow backend. Example Usage model:: pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor( model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM, framework='pt')) Args: model_uri (str): path to the pretrained model on the hugging face models hub. model_class: model class to load the repository from model_uri. framework (str): Framework to use for the model. 'tf' for TensorFlow and 'pt' for PyTorch. device: For torch tensors, specify device on which you wish to run the model. Defaults to CPU. inference_fn: the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type. load_model_args (Dict[str, Any]): (Optional) Keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None. min_batch_size: the minimum batch size to use when batching inputs. max_batch_size: the maximum batch size to use when batching inputs. max_batch_duration_secs: the maximum amount of time to buffer a batch before emitting; used in streaming contexts. large_model: set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M. kwargs: 'env_vars' can be used to set environment variables before loading the model. **Supported Versions:** HuggingFaceModelHandler supports transformers>=4.18.0. """ self._model_uri = model_uri self._model_class = model_class self._device = device self._inference_fn = inference_fn self._model_config_args = load_model_args if load_model_args else {} self._batching_kwargs = {} self._env_vars = kwargs.get("env_vars", {}) if min_batch_size is not None: self._batching_kwargs["min_batch_size"] = min_batch_size if max_batch_size is not None: self._batching_kwargs["max_batch_size"] = max_batch_size if max_batch_duration_secs is not None: self._batching_kwargs["max_batch_duration_secs"] = max_batch_duration_secs self._large_model = large_model self._framework = framework _validate_constructor_args( model_uri=self._model_uri, model_class=self._model_class)
[docs] def load_model(self): """Loads and initializes the model for processing.""" model = self._model_class.from_pretrained( self._model_uri, **self._model_config_args) if self._framework == 'pt': if self._device == "GPU" and is_gpu_available_torch(): model.to(torch.device("cuda")) if callable(getattr(model, 'requires_grad_', None)): model.requires_grad_(False) return model
[docs] def run_inference( self, batch: Sequence[Dict[str, Union[tf.Tensor, torch.Tensor]]], model: Union[AutoModel, TFAutoModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """ Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensors Predictions. This method stacks the list of Tensors in a vectorized format to optimize the inference call. Args: batch: A sequence of Keyed Tensors. These Tensors should be batchable, as this method will call `tf.stack()`/`torch.stack()` and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model's predict() function. model: A Tensorflow/PyTorch model. inference_args: Non-batchable arguments required as inputs to the model's inference function. Unlike Tensors in `batch`, these parameters will not be dynamically batched. Returns: An Iterable of type PredictionResult. """ inference_args = {} if not inference_args else inference_args if self._inference_fn: return self._inference_fn( batch, model, self._device, inference_args, self._model_uri) if self._framework == "tf": return _run_inference_tensorflow_keyed_tensor( batch, model, self._device, inference_args, self._model_uri) else: return _run_inference_torch_keyed_tensor( batch, model, self._device, inference_args, self._model_uri)
[docs] def update_model_path(self, model_path: Optional[str] = None): self._model_uri = model_path if model_path else self._model_uri
[docs] def get_num_bytes( self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int: """ Returns: The number of bytes of data for the Tensors batch. """ if self._framework == "tf": return sum(sys.getsizeof(element) for element in batch) else: return sum( (el.element_size() for tensor in batch for el in tensor.values()))
[docs] def batch_elements_kwargs(self): return self._batching_kwargs
[docs] def share_model_across_processes(self) -> bool: return self._large_model
[docs] def get_metrics_namespace(self) -> str: """ Returns: A namespace for metrics collected by the RunInference transform. """ return "BeamML_HuggingFaceModelHandler_KeyedTensor"
def _default_inference_fn_torch( batch: Sequence[Union[tf.Tensor, torch.Tensor]], model: Union[AutoModel, TFAutoModel], device, inference_args: Dict[str, Any], model_id: Optional[str] = None) -> Iterable[PredictionResult]: device = get_device_torch(device) # torch.no_grad() mitigates GPU memory issues # https://github.com/apache/beam/issues/22811 with torch.no_grad(): batched_tensors = torch.stack(batch) batched_tensors = _convert_to_device(batched_tensors, device) predictions = model(batched_tensors, **inference_args) return utils._convert_to_result(batch, predictions, model_id) def _default_inference_fn_tensorflow( batch: Sequence[Union[tf.Tensor, torch.Tensor]], model: Union[AutoModel, TFAutoModel], device, inference_args: Dict[str, Any], model_id: Optional[str] = None) -> Iterable[PredictionResult]: if device == "GPU": is_gpu_available_tensorflow(device) batched_tensors = tf.stack(batch, axis=0) predictions = model(batched_tensors, **inference_args) return utils._convert_to_result(batch, predictions, model_id)
[docs]class HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor], PredictionResult, Union[AutoModel, TFAutoModel]]): def __init__( self, model_uri: str, model_class: Union[AutoModel, TFAutoModel], device: str = "CPU", *, inference_fn: Optional[Callable[..., Iterable[PredictionResult]]] = None, load_model_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, **kwargs): """ Implementation of the ModelHandler interface for HuggingFace with Tensors for PyTorch/Tensorflow backend. Depending on the type of tensors, the model framework is determined automatically. Example Usage model: pcoll | RunInference(HuggingFaceModelHandlerTensor( model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM)) Args: model_uri (str): path to the pretrained model on the hugging face models hub. model_class: model class to load the repository from model_uri. device: For torch tensors, specify device on which you wish to run the model. Defaults to CPU. inference_fn: the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type. load_model_args (Dict[str, Any]): (Optional) keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None. min_batch_size: the minimum batch size to use when batching inputs. max_batch_size: the maximum batch size to use when batching inputs. max_batch_duration_secs: the maximum amount of time to buffer a batch before emitting; used in streaming contexts. large_model: set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M. kwargs: 'env_vars' can be used to set environment variables before loading the model. **Supported Versions:** HuggingFaceModelHandler supports transformers>=4.18.0. """ self._model_uri = model_uri self._model_class = model_class self._device = device self._inference_fn = inference_fn self._model_config_args = load_model_args if load_model_args else {} self._batching_kwargs = {} self._env_vars = kwargs.get("env_vars", {}) if min_batch_size is not None: self._batching_kwargs["min_batch_size"] = min_batch_size if max_batch_size is not None: self._batching_kwargs["max_batch_size"] = max_batch_size if max_batch_duration_secs is not None: self._batching_kwargs["max_batch_duration_secs"] = max_batch_duration_secs self._large_model = large_model self._framework = "" _validate_constructor_args( model_uri=self._model_uri, model_class=self._model_class)
[docs] def load_model(self): """Loads and initializes the model for processing.""" model = self._model_class.from_pretrained( self._model_uri, **self._model_config_args) if callable(getattr(model, 'requires_grad_', None)): model.requires_grad_(False) return model
[docs] def run_inference( self, batch: Sequence[Union[tf.Tensor, torch.Tensor]], model: Union[AutoModel, TFAutoModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """ Runs inferences on a batch of Tensors and returns an Iterable of Tensors Predictions. This method stacks the list of Tensors in a vectorized format to optimize the inference call. Args: batch: A sequence of Tensors. These Tensors should be batchable, as this method will call `tf.stack()`/`torch.stack()` and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model's predict() function. model: A Tensorflow/PyTorch model. inference_args (Dict[str, Any]): Non-batchable arguments required as inputs to the model's inference function. Unlike Tensors in `batch`, these parameters will not be dynamically batched. Returns: An Iterable of type PredictionResult. """ inference_args = {} if not inference_args else inference_args if not self._framework: if isinstance(batch[0], tf.Tensor): self._framework = "tf" else: self._framework = "pt" if (self._framework == "pt" and self._device == "GPU" and is_gpu_available_torch()): model.to(torch.device("cuda")) if self._inference_fn: return self._inference_fn( batch, model, inference_args, inference_args, self._model_uri) if self._framework == "tf": return _default_inference_fn_tensorflow( batch, model, self._device, inference_args, self._model_uri) else: return _default_inference_fn_torch( batch, model, self._device, inference_args, self._model_uri)
[docs] def update_model_path(self, model_path: Optional[str] = None): self._model_uri = model_path if model_path else self._model_uri
[docs] def get_num_bytes( self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int: """ Returns: The number of bytes of data for the Tensors batch. """ if self._framework == "tf": return sum(sys.getsizeof(element) for element in batch) else: return sum( (el.element_size() for tensor in batch for el in tensor.values()))
[docs] def batch_elements_kwargs(self): return self._batching_kwargs
[docs] def share_model_across_processes(self) -> bool: return self._large_model
[docs] def get_metrics_namespace(self) -> str: """ Returns: A namespace for metrics collected by the RunInference transform. """ return 'BeamML_HuggingFaceModelHandler_Tensor'
def _convert_to_result( batch: Iterable, predictions: Union[Iterable, Dict[Any, Iterable]], model_id: Optional[str] = None, ) -> Iterable[PredictionResult]: return [ PredictionResult(x, y, model_id) for x, y in zip(batch, [predictions]) ] def _default_pipeline_inference_fn( batch, pipeline, inference_args) -> Iterable[PredictionResult]: predicitons = pipeline(batch, **inference_args) return predicitons
[docs]class HuggingFacePipelineModelHandler(ModelHandler[str, PredictionResult, Pipeline]): def __init__( self, task: Union[str, PipelineTask] = "", model: str = "", *, device: Optional[str] = None, inference_fn: PipelineInferenceFn = _default_pipeline_inference_fn, load_pipeline_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, **kwargs): """ Implementation of the ModelHandler interface for Hugging Face Pipelines. Example Usage model:: pcoll | RunInference(HuggingFacePipelineModelHandler( task="fill-mask")) Args: task (str or enum.Enum): task supported by HuggingFace Pipelines. Accepts a string task or an enum.Enum from PipelineTask. model (str): path to the pretrained *model-id* on Hugging Face Models Hub to use custom model for the chosen task. If the `model` already defines the task then no need to specify the `task` parameter. Use the *model-id* string instead of an actual model here. Model-specific kwargs for `from_pretrained(..., **model_kwargs)` can be specified with `model_kwargs` using `load_pipeline_args`. Example Usage:: model_handler = HuggingFacePipelineModelHandler( task="text-generation", model="meta-llama/Llama-2-7b-hf", load_pipeline_args={'model_kwargs':{'quantization_map':config}}) device (str): the device (`"CPU"` or `"GPU"`) on which you wish to run the pipeline. Defaults to GPU. If GPU is not available then it falls back to CPU. You can also use advanced option like `device_map` with key-value pair as you would do in the usual Hugging Face pipeline using `load_pipeline_args`. Ex: load_pipeline_args={'device_map':auto}). inference_fn: the inference function to use during RunInference. Default is _default_pipeline_inference_fn. load_pipeline_args (Dict[str, Any]): keyword arguments to provide load options while loading pipelines from Hugging Face. Defaults to None. min_batch_size: the minimum batch size to use when batching inputs. max_batch_size: the maximum batch size to use when batching inputs. max_batch_duration_secs: the maximum amount of time to buffer a batch before emitting; used in streaming contexts. large_model: set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M. kwargs: 'env_vars' can be used to set environment variables before loading the model. **Supported Versions:** HuggingFacePipelineModelHandler supports transformers>=4.18.0. """ self._task = task self._model = model self._inference_fn = inference_fn self._load_pipeline_args = load_pipeline_args if load_pipeline_args else {} self._batching_kwargs = {} self._framework = "torch" self._env_vars = kwargs.get('env_vars', {}) if min_batch_size is not None: self._batching_kwargs['min_batch_size'] = min_batch_size if max_batch_size is not None: self._batching_kwargs['max_batch_size'] = max_batch_size if max_batch_duration_secs is not None: self._batching_kwargs["max_batch_duration_secs"] = max_batch_duration_secs self._large_model = large_model # Check if the device is specified twice. If true then the device parameter # of model handler is overridden. self._deduplicate_device_value(device) _validate_constructor_args_hf_pipeline(self._task, self._model) def _deduplicate_device_value(self, device: Optional[str]): current_device = device.upper() if device else None if (current_device and current_device != 'CPU' and current_device != 'GPU'): raise ValueError( f"Invalid device value: {device}. Please specify " "either CPU or GPU. Defaults to GPU if no value " "is provided.") if 'device' not in self._load_pipeline_args: if current_device == 'CPU': self._load_pipeline_args['device'] = 'cpu' else: if is_gpu_available_torch(): self._load_pipeline_args['device'] = 'cuda:1' else: _LOGGER.warning( "HuggingFaceModelHandler specified a 'GPU' device, " "but GPUs are not available. Switching to CPU.") self._load_pipeline_args['device'] = 'cpu' else: if current_device: raise ValueError( '`device` specified in `load_pipeline_args`. `device` ' 'parameter for HuggingFacePipelineModelHandler will be ignored.')
[docs] def load_model(self): """Loads and initializes the pipeline for processing.""" return pipeline( task=self._task, model=self._model, **self._load_pipeline_args)
[docs] def run_inference( self, batch: Sequence[str], pipeline: Pipeline, inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """ Runs inferences on a batch of examples passed as a string resource. These can either be string sentences, or string path to images or audio files. Args: batch: A sequence of strings resources. pipeline: A Hugging Face Pipeline. inference_args: Non-batchable arguments required as inputs to the model's inference function. Returns: An Iterable of type PredictionResult. """ inference_args = {} if not inference_args else inference_args predictions = self._inference_fn(batch, pipeline, inference_args) return _convert_to_result(batch, predictions)
[docs] def update_model_path(self, model_path: Optional[str] = None): """ Updates the pretrained model used by the Hugging Face Pipeline task. Make sure that the new model does the same task as initial model. Args: model_path (str): (Optional) Path to the new trained model from Hugging Face. Defaults to None. """ self._model = model_path if model_path else self._model
[docs] def get_num_bytes(self, batch: Sequence[str]) -> int: """ Returns: The number of bytes of input batch elements. """ return sum(sys.getsizeof(element) for element in batch)
[docs] def batch_elements_kwargs(self): return self._batching_kwargs
[docs] def share_model_across_processes(self) -> bool: return self._large_model
[docs] def get_metrics_namespace(self) -> str: """ Returns: A namespace for metrics collected by the RunInference transform. """ return 'BeamML_HuggingFacePipelineModelHandler'