apache_beam.ml.inference.sklearn_inference module

class apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy(model_uri: str, model_file_type: apache_beam.ml.inference.sklearn_inference.ModelFileType = <ModelFileType.PICKLE: 1>, *, inference_fn: Callable[[sklearn.base.BaseEstimator, Sequence[numpy.ndarray], Optional[Dict[str, Any]]], Any] = <function _default_numpy_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for scikit-learn using numpy arrays as input.

Example Usage:

pcoll | RunInference(SklearnModelHandlerNumpy(model_uri="my_uri"))
Parameters:
  • model_uri – The URI to where the model is saved.
  • model_file_type – The method of serialization of the argument. default=pickle
  • inference_fn – The inference function to use. default=_default_numpy_inference_fn
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Numpy ndarrays.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Numpy ndarrays.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.
load_model() → sklearn.base.BaseEstimator[source]

Loads and initializes a model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[numpy.ndarray], model: sklearn.base.BaseEstimator, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of numpy arrays.

Parameters:
  • batch – A sequence of examples as numpy arrays. They should be single examples.
  • model – A numpy model or pipeline. Must implement predict(X). Where the parameter X is a numpy array.
  • inference_args – Any additional arguments for an inference.
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[numpy.ndarray]) → int[source]
Returns:The number of bytes of data for a batch.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]
class apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerPandas(model_uri: str, model_file_type: apache_beam.ml.inference.sklearn_inference.ModelFileType = <ModelFileType.PICKLE: 1>, *, inference_fn: Callable[[sklearn.base.BaseEstimator, Sequence[pandas.core.frame.DataFrame], Optional[Dict[str, Any]]], Any] = <function _default_pandas_inference_fn>, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, large_model: bool = False, model_copies: Optional[int] = None, **kwargs)[source]

Bases: apache_beam.ml.inference.base.ModelHandler

Implementation of the ModelHandler interface for scikit-learn that supports pandas dataframes.

Example Usage:

pcoll | RunInference(SklearnModelHandlerPandas(model_uri="my_uri"))

NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.

Parameters:
  • model_uri – The URI to where the model is saved.
  • model_file_type – The method of serialization of the argument. default=pickle
  • inference_fn – The inference function to use. default=_default_pandas_inference_fn
  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Pandas Dataframes.
  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Pandas Dataframes.
  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.
  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.
  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.
  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.
load_model() → sklearn.base.BaseEstimator[source]

Loads and initializes a model for processing.

update_model_path(model_path: Optional[str] = None)[source]
run_inference(batch: Sequence[pandas.core.frame.DataFrame], model: sklearn.base.BaseEstimator, inference_args: Optional[Dict[str, Any]] = None) → Iterable[apache_beam.ml.inference.base.PredictionResult][source]

Runs inferences on a batch of pandas dataframes.

Parameters:
  • batch – A sequence of examples as numpy arrays. They should be single examples.
  • model – A dataframe model or pipeline. Must implement predict(X). Where the parameter X is a pandas dataframe.
  • inference_args – Any additional arguments for an inference.
Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[pandas.core.frame.DataFrame]) → int[source]
Returns:The number of bytes of data for a batch.
get_metrics_namespace() → str[source]
Returns:A namespace for metrics collected by the RunInference transform.
batch_elements_kwargs()[source]
share_model_across_processes() → bool[source]
model_copies() → int[source]