apache_beam.ml.inference.tensorflow_inference module

class apache_beam.ml.inference.tensorflow_inference.TFModelHandlerNumpy(model_uri: str, model_type: apache_beam.ml.inference.tensorflow_inference.ModelType = <ModelType.SAVED_MODEL: 1>, create_model_fn: Optional[Callable] = None, *, load_model_args: Optional[Dict[str, Any]] = None, custom_weights: str = '', inference_fn: Callable[[<sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89e0ad90>, Sequence[Union[numpy.ndarray, <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89e0afd0>]], Dict[str, Any], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_numpy_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for Tensorflow.

Example Usage:

pcoll | RunInference(TFModelHandlerNumpy(model_uri="my_uri"))

See https://www.tensorflow.org/tutorials/keras/save_and_load for details.

Parameters:
  • model_uri (str) – path to the trained model.
  • model_type – type of model to be loaded. Defaults to SAVED_MODEL.
  • create_model_fn – a function that creates and returns a new tensorflow model to load the saved weights. It should be used with ModelType.SAVED_WEIGHTS.
  • load_model_args – a dictionary of parameters to pass to the load_model function of TensorFlow to specify custom config.
  • custom_weights (str) – path to the custom weights to be applied once the model is loaded.
  • inference_fn – inference function to use during RunInference. Defaults to default_numpy_inference_fn.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with Tensorflow 2.9, 2.10, 2.11.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89d9b670>[source]

Loads and initializes a Tensorflow model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[numpy.ndarray], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89db2b80>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of numpy array and returns an Iterable of numpy array Predictions.

This method stacks the n-dimensional numpy array in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of numpy nd-array. These should be batchable, as this method will call numpy.stack() and pass in batched numpy nd-array with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow model.
  • inference_args – any additional arguments for an inference.
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[numpy.ndarray]) → int[source]
Returns:The number of bytes of data for a batch of numpy arrays.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
class apache_beam.ml.inference.tensorflow_inference.TFModelHandlerTensor(model_uri: str, model_type: apache_beam.ml.inference.tensorflow_inference.ModelType = <ModelType.SAVED_MODEL: 1>, create_model_fn: Optional[Callable] = None, *, load_model_args: Optional[Dict[str, Any]] = None, custom_weights: str = '', inference_fn: Callable[[<sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89e0ad90>, Sequence[Union[numpy.ndarray, <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89e0afd0>]], Dict[str, Any], Optional[str]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for Tensorflow.

Example Usage:

pcoll | RunInference(TFModelHandlerTensor(model_uri="my_uri"))

See https://www.tensorflow.org/tutorials/keras/save_and_load for details.

Parameters:
  • model_uri (str) – path to the trained model.
  • model_type – type of model to be loaded. Defaults to SAVED_MODEL.
  • create_model_fn – a function that creates and returns a new tensorflow model to load the saved weights. It should be used with ModelType.SAVED_WEIGHTS.
  • load_model_args – a dictionary of parameters to pass to the load_model function of TensorFlow to specify custom config.
  • custom_weights (str) – path to the custom weights to be applied once the model is loaded.
  • inference_fn – inference function to use during RunInference. Defaults to default_numpy_inference_fn.
  • min_batch_size – the minimum batch size to use when batching inputs.
  • max_batch_size – the maximum batch size to use when batching inputs.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with Tensorflow 2.11.

load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a97dd7b20>[source]

Loads and initializes a tensorflow model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89df8d00>], model: <sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89df8b80>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of tf.Tensor and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call tf.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.
  • model – A Tensorflow model.
  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[<sphinx.ext.autodoc.importer._MockObject object at 0x7f0a89df8160>]) → int[source]
Returns:The number of bytes of data for a batch of Tensors.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
validate_inference_args(inference_args: Optional[Dict[str, Any]])[source]
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]