RunInference
In Apache Beam 2.40.0, Beam introduced the RunInference API, which lets you deploy a machine learning model in a Beam pipeline. A RunInference
transform performs inference on a PCollection
of examples using a machine learning (ML) model. The transform outputs a PCollection that contains the input examples and output predictions. For more information, see RunInference here. You can also find inference examples on GitHub.
Using RunInference with very large models
RunInference works well on arbitrarily large models as long as they can fit on your hardware.
This example demonstrates running inference with a T5
language model using RunInference
in a pipeline. T5
is an encoder-decoder model pre-trained on a multi-task mixture of unsupervised and supervised tasks. Each task is converted into a text-to-text format. The example uses T5-11B
, which contains 11 billion parameters and is 45 GB in size. In order to work well on a variety of tasks, T5
prepends a different prefix to the input corresponding to each task. For example, for translation, the input would be: translate English to German: …
and for summarization, it would be: summarize: …
. For more information about T5
see the T5 overiew in the HuggingFace documentation.
Run the Pipeline ?
First, install the required packages listed in requirements.txt and pass the required arguments. You can download the T5-11b
model from Hugging Face Hub using:
- git lfs install
- git clone https://huggingface.co/t5-11b Note: It will download the checkpoint, then you need to convert it to the model state dict as mentioned here.
You can view the code on GitHub
- Locally on your machine:
python main.py --runner DirectRunner --model_state_dict_path <local or remote path to state_dict>
. You need to have 45 GB of disk space available to run this example. - On Google Cloud using Dataflow:
python main.py --runner DataflowRunner --model_state_dict_path <local or remote path to state_dict> --project PROJECT_ID --region REGION --requirements_file requirements.txt --temp_location GCS_PATH
. Make sure to pass other arguments as mentioned here
Pipeline Steps
The pipeline contains the following steps:
- Read the inputs.
- Encode the text into transformer-readable token ID integers using a tokenizer.
- Use RunInference to get the output.
- Decode the RunInference output and print it.
The following code snippet contains the four steps:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| "CreateInputs" >> beam.Create(task_sentences)
| "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
| "RunInference" >> RunInference(model_handler=model_handler)
| "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
)
In the third step of pipeline we use RunInference
.
In order to use it, you must first define a ModelHandler
. RunInference provides model handlers for PyTorch
, TensorFlow
and Scikit-Learn
. Because the example uses a PyTorch
model, it uses the PyTorchModelHandlerTensor
model handler.
A ModelHandler
requires parameters like:
state_dict_path
– The path to the saved dictionary of the model state.model_class
– The class of the Pytorch model that defines the model structure.model_params
– A dictionary of arguments required to instantiate the model class.device
– The device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.inference_fn
- The inference function to use during RunInference.
Last updated on 2023/01/23
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!