apache_beam.ml.inference.pytorch_inference module

class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], torch.nn.modules.module.Module]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[torch.Tensor], torch.nn.modules.module.Module, torch.device, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerTensor(state_dict_path=”my_uri”,
model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerTensor(
torch_script_model_path=”my_uri”))

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the inference function to use during RunInference. default=_default_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded.
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.

Supported Versions: RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10.

load_model() → torch.nn.modules.module.Module[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[torch.Tensor], model: torch.nn.modules.module.Module, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[torch.Tensor]) → int[source]
Returns:The number of bytes of data for a batch of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerKeyedTensor(state_dict_path: Optional[str] = None, model_class: Optional[Callable[[...], torch.nn.modules.module.Module]] = None, model_params: Optional[Dict[str, Any]] = None, device: str = 'CPU', *, inference_fn: Callable[[Sequence[Dict[str, torch.Tensor]], torch.nn.modules.module.Module, torch.device, Optional[Dict[str, Any]], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_keyed_tensor_inference_fn>, torch_script_model_path: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
state_dict_path=”my_uri”, model_class=”my_class”))
Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(
torch_script_model_path=”my_uri”))

NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.
  • model_class – class of the Pytorch model that defines the model structure.
  • model_params – A dictionary of arguments required to instantiate the model class.
  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
  • inference_fn – the function to invoke on run_inference. default = default_keyed_tensor_inference_fn
  • torch_script_model_path
    Path to the torch script model.
    the model will be loaded using torch.jit.load().
    state_dict_path, model_class and model_params
    arguments will be disregarded..
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.

Supported Versions: RunInference APIs in Apache Beam have been tested on torch>=1.9.0,<1.14.0.

load_model() → torch.nn.modules.module.Module[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[Dict[str, torch.Tensor]], model: torch.nn.modules.module.Module, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensor Predictions.

For the same key across all examples, this will stack all Tensors values in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of keyed Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.
  • model – A PyTorch model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[torch.Tensor]) → int[source]
Returns:The number of bytes of data for a batch of Dict of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]