apache_beam.ml.inference.pytorch_inference module

class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e76bf70>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd370>], <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd3d0>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd4c0>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerTensor(state_dict_path=”my_uri”,
model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerTensor(
torch_script_model_path=”my_uri”))

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the inference function to use during RunInference. default=_default_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded.
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f7623a95ee0>[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd7f0>], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd8e0>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd9d0>]) → int[source]
Returns:The number of bytes of data for a batch of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]
class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerKeyedTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bdd30>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd520>]], <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd670>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bd790>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_keyed_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
state_dict_path=”my_uri”, model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
torch_script_model_path=”my_uri”))

NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the function to invoke on run_inference. default = default_keyed_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded.
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested on torch>=1.9.0,<1.14.0.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f76205dea30>[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bde20>]], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bdeb0>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensor Predictions.

For the same key across all examples, this will stack all Tensors values in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of keyed Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f761e6bdfd0>]) → int[source]
Returns:The number of bytes of data for a batch of Dict of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]