RunInference

Pydoc Pydoc




Uses models to do local and remote inference. A RunInference transform performs inference on a PCollection of examples using a machine learning (ML) model. The transform outputs a PCollection that contains the input examples and output predictions.

You must have Apache Beam 2.40.0 or later installed to run these pipelines.

See more RunInference API pipeline examples.

Examples

In the following examples, we explore how to create pipelines that use the Beam RunInference API to make predictions based on models.

Example 1: PyTorch unkeyed model

In this example, we create a pipeline that uses a PyTorch RunInference transform on unkeyed data.

import apache_beam as beam
import numpy
import torch
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor

model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt'  # pylint: disable=line-too-long
model_class = LinearRegression
model_params = {'input_dim': 1, 'output_dim': 1}
model_handler = PytorchModelHandlerTensor(
    model_class=model_class,
    model_params=model_params,
    state_dict_path=model_state_dict_path)

unkeyed_data = numpy.array([10, 40, 60, 90],
                           dtype=numpy.float32).reshape(-1, 1)

with beam.Pipeline() as p:
  predictions = (
      p
      | 'InputData' >> beam.Create(unkeyed_data)
      | 'ConvertNumpyToTensor' >> beam.Map(torch.Tensor)
      | 'PytorchRunInference' >> RunInference(model_handler=model_handler)
      | beam.Map(print))

Output:

PredictionResult(example=tensor([10.]), inference=tensor([52.2325], grad_fn=<UnbindBackward>))
PredictionResult(example=tensor([40.]), inference=tensor([201.1165], grad_fn=<UnbindBackward>))
PredictionResult(example=tensor([60.]), inference=tensor([300.3724], grad_fn=<UnbindBackward>))
PredictionResult(example=tensor([90.]), inference=tensor([449.2563], grad_fn=<UnbindBackward>))

Example 2: PyTorch keyed model

In this example, we create a pipeline that uses a PyTorch RunInference transform on keyed data.

import apache_beam as beam
import torch
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor

model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt'  # pylint: disable=line-too-long
model_class = LinearRegression
model_params = {'input_dim': 1, 'output_dim': 1}
keyed_model_handler = KeyedModelHandler(
    PytorchModelHandlerTensor(
        model_class=model_class,
        model_params=model_params,
        state_dict_path=model_state_dict_path))

keyed_data = [("first_question", 105.00), ("second_question", 108.00),
              ("third_question", 1000.00), ("fourth_question", 1013.00)]

with beam.Pipeline() as p:
  predictions = (
      p
      | 'KeyedInputData' >> beam.Create(keyed_data)
      | "ConvertIntToTensor" >>
      beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | 'PytorchRunInference' >>
      RunInference(model_handler=keyed_model_handler)
      | beam.Map(print))

Output:

('first_question', PredictionResult(example=tensor([105.]), inference=tensor([523.6982], grad_fn=<UnbindBackward>)))
('second_question', PredictionResult(example=tensor([108.]), inference=tensor([538.5867], grad_fn=<UnbindBackward>)))
('third_question', PredictionResult(example=tensor([1000.]), inference=tensor([4965.4019], grad_fn=<UnbindBackward>)))
('fourth_question', PredictionResult(example=tensor([1013.]), inference=tensor([5029.9180], grad_fn=<UnbindBackward>)))

Example 3: Sklearn unkeyed model

In this example, we create a pipeline that uses an SKlearn RunInference transform on unkeyed data.

import apache_beam as beam
import numpy
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

sklearn_model_filename = 'gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'  # pylint: disable=line-too-long
sklearn_model_handler = SklearnModelHandlerNumpy(
    model_uri=sklearn_model_filename, model_file_type=ModelFileType.PICKLE)

unkeyed_data = numpy.array([20, 40, 60, 90],
                           dtype=numpy.float32).reshape(-1, 1)
with beam.Pipeline() as p:
  predictions = (
      p
      | "ReadInputs" >> beam.Create(unkeyed_data)
      | "RunInferenceSklearn" >>
      RunInference(model_handler=sklearn_model_handler)
      | beam.Map(print))

Output:

PredictionResult(example=array([20.], dtype=float32), inference=array([100.], dtype=float32))
PredictionResult(example=array([40.], dtype=float32), inference=array([200.], dtype=float32))
PredictionResult(example=array([60.], dtype=float32), inference=array([300.], dtype=float32))
PredictionResult(example=array([90.], dtype=float32), inference=array([450.], dtype=float32))

Example 4: Sklearn keyed model

In this example, we create a pipeline that uses an SKlearn RunInference transform on keyed data.

import apache_beam as beam
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

sklearn_model_filename = 'gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl'  # pylint: disable=line-too-long
sklearn_model_handler = KeyedModelHandler(
    SklearnModelHandlerNumpy(
        model_uri=sklearn_model_filename,
        model_file_type=ModelFileType.PICKLE))

keyed_data = [("first_question", 105.00), ("second_question", 108.00),
              ("third_question", 1000.00), ("fourth_question", 1013.00)]

with beam.Pipeline() as p:
  predictions = (
      p
      | "ReadInputs" >> beam.Create(keyed_data)
      | "ConvertDataToList" >> beam.Map(lambda x: (x[0], [x[1]]))
      | "RunInferenceSklearn" >>
      RunInference(model_handler=sklearn_model_handler)
      | beam.Map(print))

Output:

('first_question', PredictionResult(example=[105.0], inference=array([525.])))
('second_question', PredictionResult(example=[108.0], inference=array([540.])))
('third_question', PredictionResult(example=[1000.0], inference=array([5000.])))
('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.])))

Not applicable.

Pydoc Pydoc