apache_beam.ml.inference.pytorch_inference module

class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2b520>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2ec40>], <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2eca0>, <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2ed90>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerTensor(state_dict_path=”my_uri”,
model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerTensor(
torch_script_model_path=”my_uri”))

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the inference function to use during RunInference. default=_default_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded.
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4f0bca60>[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2b4c0>], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2b3d0>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef27940>]) → int[source]
Returns:The number of bytes of data for a batch of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]
class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerKeyedTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef355e0>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4f0bc880>]], <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef2ef10>, <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef35070>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_keyed_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
state_dict_path=”my_uri”, model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
torch_script_model_path=”my_uri”))

NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the function to invoke on run_inference. default = default_keyed_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded.
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested on torch>=1.9.0,<1.14.0.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef35670>[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef35730>]], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef35790>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensor Predictions.

For the same key across all examples, this will stack all Tensors values in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of keyed Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7fef4ef358b0>]) → int[source]
Returns:The number of bytes of data for a batch of Dict of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]