apache_beam.ml.inference.pytorch_inference module¶
-
class
apache_beam.ml.inference.pytorch_inference.
PytorchModelHandlerTensor
(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade0d0>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad47f0>], <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad4850>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad4940>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]¶ Bases:
apache_beam.ml.inference.base.ModelHandler
Implementation of the ModelHandler interface for PyTorch.
- Example Usage for torch model::
- pcoll | RunInference(PytorchModelHandlerTensor(state_dict_path=”my_uri”,
- model_class=”my_class”))
- Example Usage for torchscript model::
- pcoll | RunInference(PytorchModelHandlerTensor(
- torch_script_model_path=”my_uri”))
See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details
Parameters: - state_dict_path – path to the saved dictionary of the model state.
- model_class – class of the Pytorch model that defines the model structure.
- model_params – A dictionary of arguments required to instantiate the model class.
- device – the device on which you wish to run the model. If
device = GPU
then a GPU device will be used if it is available. Otherwise, it will be CPU. - inference_fn – the inference function to use during RunInference. default=_default_tensor_inference_fn
- torch_script_model_path –
- Path to the torch script model.
- the model will be loaded using torch.jit.load().
- state_dict_path, model_class and model_params
- arguments will be disregarded.
- min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
- max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
- max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
- large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
- model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
- load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
- kwargs – ‘env_vars’ can be used to set environment variables before loading the model.
Supported Versions: RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10.
-
load_model
() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380be8f70>[source]¶ Loads and initializes a Pytorch model for processing.
-
run_inference
(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade160>], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade250>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]¶ Runs inferences on a batch of Tensors and returns an Iterable of Tensor Predictions.
This method stacks the list of Tensors in a vectorized format to optimize the inference call.
Parameters: - batch – A sequence of Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
- model – A PyTorch model.
- inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns: An Iterable of type PredictionResult.
-
get_num_bytes
(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade340>]) → int[source]¶ Returns: The number of bytes of data for a batch of Tensors.
-
get_metrics_namespace
() → str[source]¶ Returns: A namespace for metrics collected by the RunInference transform.
-
class
apache_beam.ml.inference.pytorch_inference.
PytorchModelHandlerKeyedTensor
(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade6a0>]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad49a0>]], <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad4af0>, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ad4c10>, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_keyed_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, load_model_args: Optional[Dict[str, Any]] = None, **kwargs)[source]¶ Bases:
apache_beam.ml.inference.base.ModelHandler
Implementation of the ModelHandler interface for PyTorch.
- Example Usage for torch model::
- pcoll | RunInference(PytorchModelHandlerKeyedTensor(
- state_dict_path=”my_uri”, model_class=”my_class”))
- Example Usage for torchscript model::
- pcoll | RunInference(PytorchModelHandlerKeyedTensor(
- torch_script_model_path=”my_uri”))
NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.
See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details
Parameters: - state_dict_path – path to the saved dictionary of the model state.
- model_class – class of the Pytorch model that defines the model structure.
- model_params – A dictionary of arguments required to instantiate the model class.
- device – the device on which you wish to run the model. If
device = GPU
then a GPU device will be used if it is available. Otherwise, it will be CPU. - inference_fn – the function to invoke on run_inference. default = default_keyed_tensor_inference_fn
- torch_script_model_path –
- Path to the torch script model.
- the model will be loaded using torch.jit.load().
- state_dict_path, model_class and model_params
- arguments will be disregarded.
- min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
- max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
- max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
- large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
- model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
- load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.
- kwargs – ‘env_vars’ can be used to set environment variables before loading the model.
Supported Versions: RunInference APIs in Apache Beam have been tested on torch>=1.9.0,<1.14.0.
-
load_model
() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380bef1c0>[source]¶ Loads and initializes a Pytorch model for processing.
-
run_inference
(batch: Sequence[Dict[str, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade790>]], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade820>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]¶ Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensor Predictions.
For the same key across all examples, this will stack all Tensors values in a vectorized format to optimize the inference call.
Parameters: - batch – A sequence of keyed Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
- model – A PyTorch model.
- inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns: An Iterable of type PredictionResult.
-
get_num_bytes
(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380ade940>]) → int[source]¶ Returns: The number of bytes of data for a batch of Dict of Tensors.
-
get_metrics_namespace
() → str[source]¶ Returns: A namespace for metrics collected by the RunInference transform.