apache_beam.ml.inference.huggingface_inference module

class apache_beam.ml.inference.huggingface_inference.HuggingFaceModelHandlerKeyedTensor(model_uri: str, model_class: Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10faa60>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa190>], framework: str, device: str = 'CPU', *, inference_fn: Optional[Callable[[...], Iterable[apache_beam.ml.inference.base.PredictionResult]]] = None, load_model_args: Optional[Dict[str, Any]] = None, inference_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for HuggingFace with Keyed Tensors for PyTorch/Tensorflow backend.

Example Usage model::
pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor(
model_uri=”bert-base-uncased”, model_class=AutoModelForMaskedLM, framework=’pt’))
Parameters:
  • model_uri (str) – path to the pretrained model on the hugging face models hub.
  • model_class – model class to load the repository from model_uri.
  • framework (str) – Framework to use for the model. ‘tf’ for TensorFlow and ‘pt’ for PyTorch.
  • device – For torch tensors, specify device on which you wish to run the model. Defaults to CPU.
  • inference_fn – the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type.
  • load_model_args (Dict[str, Any]) – (Optional) Keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None.
  • inference_args (Dict[str, Any]) – (Optional) Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched. Defaults to None.
  • min_batch_size – the minimum batch size to use when batching inputs.
  • max_batch_size – the maximum batch size to use when batching inputs.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFaceModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the model for processing.

run_inference(batch: Sequence[Dict[str, Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f12cf6d0>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f12cf340>]]], model: Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10faa60>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa190>], inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensors Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Keyed Tensors. These Tensors should be batchable, as this method will call tf.stack()/torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow/PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched.
Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: Optional[str] = None)[source]
get_num_bytes(batch: Sequence[Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f0f62550>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f0f62f10>]]) → int[source]
Returns:The number of bytes of data for the Tensors batch.
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
class apache_beam.ml.inference.huggingface_inference.HuggingFaceModelHandlerTensor(model_uri: str, model_class: Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10faa60>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa190>], device: str = 'CPU', *, inference_fn: Optional[Callable[[...], Iterable[apache_beam.ml.inference.base.PredictionResult]]] = None, load_model_args: Optional[Dict[str, Any]] = None, inference_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for HuggingFace with Tensors for PyTorch/Tensorflow backend.

Depending on the type of tensors, the model framework is determined automatically.

Example Usage model:
pcoll | RunInference(HuggingFaceModelHandlerTensor(
model_uri=”bert-base-uncased”, model_class=AutoModelForMaskedLM))
Parameters:
  • model_uri (str) – path to the pretrained model on the hugging face models hub.
  • model_class – model class to load the repository from model_uri.
  • device – For torch tensors, specify device on which you wish to run the model. Defaults to CPU.
  • inference_fn – the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type.
  • load_model_args (Dict[str, Any]) – (Optional) keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None.
  • inference_args (Dict[str, Any]) – (Optional) Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched. Defaults to None.
  • min_batch_size – the minimum batch size to use when batching inputs.
  • max_batch_size – the maximum batch size to use when batching inputs.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFaceModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the model for processing.

run_inference(batch: Sequence[Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f13fda30>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f13fdd00>]], model: Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10faa60>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa190>], inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensors Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call tf.stack()/torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow/PyTorch model.
  • inference_args (Dict[str, Any]) – Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched.
Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: Optional[str] = None)[source]
get_num_bytes(batch: Sequence[Union[<sphinx.ext.autodoc.importer._MockObject object at 0x7f50f13fd820>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f13fdbb0>]]) → int[source]
Returns:The number of bytes of data for the Tensors batch.
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
class apache_beam.ml.inference.huggingface_inference.HuggingFacePipelineModelHandler(task: Union[str, apache_beam.ml.inference.huggingface_inference.PipelineTask] = '', model: str = '', *, inference_fn: Callable[[Sequence[str], <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa820>, Optional[Dict[str, Any]]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function _default_pipeline_inference_fn>, load_pipeline_args: Optional[Dict[str, Any]] = None, inference_args: Optional[Dict[str, Any]] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for Hugging Face Pipelines.

Note: To specify which device to use (CPU/GPU), use the load_pipeline_args with key-value as you would do in the usual Hugging Face pipeline. Ex: load_pipeline_args={‘device’:0})

Example Usage model::
pcoll | RunInference(HuggingFacePipelineModelHandler(
task=”fill-mask”))
Parameters:
  • task (str or enum.Enum) – task supported by HuggingFace Pipelines. Accepts a string task or an enum.Enum from PipelineTask.
  • model (str) –

    path to the pretrained model-id on Hugging Face Models Hub to use custom model for the chosen task. If the model already defines the task then no need to specify the task parameter. Use the model-id string instead of an actual model here. Model-specific kwargs for from_pretrained(…, **model_kwargs) can be specified with model_kwargs using load_pipeline_args.

    Example Usage::
    model_handler = HuggingFacePipelineModelHandler(
    task=”text-generation”, model=”meta-llama/Llama-2-7b-hf”, load_pipeline_args={‘model_kwargs’:{‘quantization_map’:config}})
  • inference_fn – the inference function to use during RunInference. Default is _default_pipeline_inference_fn.
  • load_pipeline_args (Dict[str, Any]) – keyword arguments to provide load options while loading pipelines from Hugging Face. Defaults to None.
  • inference_args (Dict[str, Any]) – Non-batchable arguments required as inputs to the model’s inference function. Defaults to None.
  • min_batch_size – the minimum batch size to use when batching inputs.
  • max_batch_size – the maximum batch size to use when batching inputs.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFacePipelineModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the pipeline for processing.

run_inference(batch: Sequence[str], pipeline: <sphinx.ext.autodoc.importer._MockObject object at 0x7f50f10fa820>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of examples passed as a string resource. These can either be string sentences, or string path to images or audio files.

Parameters:
  • batch – A sequence of strings resources.
  • pipeline – A Hugging Face Pipeline.
  • inference_args – Non-batchable arguments required as inputs to the model’s inference function.
Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: Optional[str] = None)[source]

Updates the pretrained model used by the Hugging Face Pipeline task. Make sure that the new model does the same task as initial model.

Parameters:model_path (str) – (Optional) Path to the new trained model from Hugging Face. Defaults to None.
get_num_bytes(batch: Sequence[str]) → int[source]
Returns:The number of bytes of input batch elements.
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.