apache_beam.ml.inference.tensorflow_inference module

class apache_beam.ml.inference.tensorflow_inference.TFModelHandlerNumpy(model_uri: str, model_type: apache_beam.ml.inference.tensorflow_inference.ModelType = <ModelType.SAVED_MODEL: 1>, create_model_fn: Optional[Callable] = None, *, load_model_args: Optional[Dict[str, Any]] = None, custom_weights: str = '', inference_fn: Callable[[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a26400>, Sequence[Union[numpy.ndarray, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a26940>]], Dict[str, Any], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_numpy_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for Tensorflow.

Example Usage:

pcoll | RunInference(TFModelHandlerNumpy(model_uri="my_uri"))

See https://www.tensorflow.org/tutorials/keras/save_and_load for details.

Parameters:
  • model_uri (str) – path to the trained model.
  • model_type – type of model to be loaded. Defaults to SAVED_MODEL.
  • create_model_fn – a function that creates and returns a new tensorflow model to load the saved weights. It should be used with ModelType.SAVED_WEIGHTS.
  • load_model_args – a dictionary of parameters to pass to the load_model function of TensorFlow to specify custom config.
  • custom_weights (str) – path to the custom weights to be applied once the model is loaded.
  • inference_fn – inference function to use during RunInference. Defaults to default_numpy_inference_fn.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with Tensorflow 2.9, 2.10, 2.11.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a32ca0>[source]

Loads and initializes a Tensorflow model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[numpy.ndarray], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a32b80>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of numpy array and returns an Iterable of numpy array Predictions.

This method stacks the n-dimensional numpy array in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of numpy nd-array. These should be batchable, as this method will call numpy.stack() and pass in batched numpy nd-array with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow model.
  • inference_args – any additional arguments for an inference.
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[numpy.ndarray]) → int[source]
Returns:The number of bytes of data for a batch of numpy arrays.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]
class apache_beam.ml.inference.tensorflow_inference.TFModelHandlerTensor(model_uri: str, model_type: apache_beam.ml.inference.tensorflow_inference.ModelType = <ModelType.SAVED_MODEL: 1>, create_model_fn: Optional[Callable] = None, *, load_model_args: Optional[Dict[str, Any]] = None, custom_weights: str = '', inference_fn: Callable[[<sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a26400>, Sequence[Union[numpy.ndarray, <sphinx.ext.autodoc.importer._MockObject object at 0x7f7380a26940>]], Dict[str, Any], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for Tensorflow.

Example Usage:

pcoll | RunInference(TFModelHandlerTensor(model_uri="my_uri"))

See https://www.tensorflow.org/tutorials/keras/save_and_load for details.

Parameters:
  • model_uri (str) – path to the trained model.
  • model_type – type of model to be loaded. Defaults to SAVED_MODEL.
  • create_model_fn – a function that creates and returns a new tensorflow model to load the saved weights. It should be used with ModelType.SAVED_WEIGHTS.
  • load_model_args – a dictionary of parameters to pass to the load_model function of TensorFlow to specify custom config.
  • custom_weights (str) – path to the custom weights to be applied once the model is loaded.
  • inference_fn – inference function to use during RunInference. Defaults to default_numpy_inference_fn.
  • min_batch_size – the minimum batch size to use when batching inputs.
  • max_batch_size – the maximum batch size to use when batching inputs.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with Tensorflow 2.11.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f738084eaf0>[source]

Loads and initializes a tensorflow model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f738084eb50>], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f738097ba60>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of tf.Tensor and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call tf.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f738097b280>]) → int[source]
Returns:The number of bytes of data for a batch of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]