apache_beam.ml.inference.onnx_inference module

class apache_beam.ml.inference.onnx_inference.OnnxModelHandlerNumpy(model_uri: str, session_options=None, providers=['CUDAExecutionProvider', 'CPUExecutionProvider'], provider_options=None, *, inference_fn: Callable[[Sequence[numpy.ndarray], <sphinx.ext.autodoc.importer._MockObject object at 0x7fd732eaa880>, Optional[Dict[str, Any]]], Iterable[apache_beam.ml.inference.base.PredictionResult]] = <function default_numpy_inference_fn>, large_model: bool = False, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for onnx using numpy arrays as input. Note that inputs to ONNXModelHandler should be of the same sizes

Example Usage:

pcoll | RunInference(OnnxModelHandler(model_uri="my_uri"))
Parameters:
  • model_uri – The URI to where the model is saved.
  • inference_fn – The inference function to use on RunInference calls. default=default_numpy_inference_fn
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.
load_model() → <sphinx.ext.autodoc.importer._MockObject object at 0x7fd732eaae50>[source]

Loads and initializes an onnx inference session for processing.

run_inference(batch: Sequence[numpy.ndarray], inference_session: <sphinx.ext.autodoc.importer._MockObject object at 0x7fd732eaaca0>, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of numpy arrays.

Parameters:
  • batch – A sequence of examples as numpy arrays. They should be single examples.
  • inference_session – An onnx inference session. Must be runnable with input x where x is sequence of numpy array
  • inference_args – Any additional arguments for an inference.
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[numpy.ndarray]) → int[source]
Returns:The number of bytes of data for a batch.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
share_model_across_processes() → bool[source]