Large Language Model Inference in Beam

In Apache Beam 2.40.0, Beam introduced the RunInference API, which lets you deploy a machine learning model in a Beam pipeline. A RunInference transform performs inference on a PCollection of examples using a machine learning (ML) model. The transform outputs a PCollection that contains the input examples and output predictions. For more information, see RunInference here. You can also find inference examples on GitHub.

Using RunInference with very large models

RunInference works well on arbitrarily large models as long as they can fit on your hardware.

Memory Management

RunInference has several mechanisms for reducing memory utilization. For example, by default RunInference load at most a single copy of each model per process (rather than one per thread).

Many Beam runners, however, run multiple Beam processes per machine at once. This can cause problems since the memory footprint of loading large models like LLMs multiple times can be too large to fit into a single machine. For memory-intensive models, RunInference provides a mechanism for more intelligently sharing memory across multiple processes to reduce the overall memory footprint. To enable this mode, users just have to set the parameter large_model to True in their model configuration (see below for an example), and Beam will take care of the memory management.

Running an Example Pipeline with T5

This example demonstrates running inference with a T5 language model using RunInference in a pipeline. T5 is an encoder-decoder model pre-trained on a multi-task mixture of unsupervised and supervised tasks. Each task is converted into a text-to-text format. The example uses T5-11B, which contains 11 billion parameters and is 45 GB in size. In order to work well on a variety of tasks, T5 prepends a different prefix to the input corresponding to each task. For example, for translation, the input would be: translate English to German: … and for summarization, it would be: summarize: …. For more information about T5 see the T5 overiew in the HuggingFace documentation.

To run inference with this model, first, install apache-beam 2.40 or greater:

pip install apache-beam -U

Next, install the required packages listed in requirements.txt and pass the required arguments. You can download the T5-11b model from Hugging Face Hub with the following steps:

import torch
from transformers import T5ForConditionalGeneration

model = T5ForConditionalGeneration.from_pretrained("path/to/cloned/t5-11b")
torch.save(model.state_dict(), "path/to/save/state_dict.pth")

You can view the code on GitHub

  1. Locally on your machine:
python main.py --runner DirectRunner \
               --model_state_dict_path <local or remote path to state_dict> \
               --model_name t5-11b

You need to have 45 GB of disk space available to run this example.

  1. On Google Cloud using Dataflow:
python main.py --runner DataflowRunner \
                --model_state_dict_path <gs://path/to/saved/state_dict.pth> \
                --model_name t5-11b \
                --project <PROJECT_ID> \
                --region <REGION> \
                --requirements_file requirements.txt \
                --staging_location <gs://path/to/staging/location>
                --temp_location <gs://path/to/temp/location> \
                --experiments "use_runner_v2,no_use_multiple_sdk_containers" \
                --machine_type=n1-highmem-16 \
                --disk_size_gb=200

You can also pass other configuration parameters as described here.

Pipeline Steps

The pipeline contains the following steps:

  1. Read the inputs.
  2. Encode the text into transformer-readable token ID integers using a tokenizer.
  3. Use RunInference to get the output.
  4. Decode the RunInference output and print it.

The following code snippet contains the four steps:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "CreateInputs" >> beam.Create(task_sentences)
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
        )

In the third step of pipeline we use RunInference. In order to use it, you must first define a ModelHandler. RunInference provides model handlers for PyTorch, TensorFlow and Scikit-Learn. Because the example uses a PyTorch model, it uses the PyTorchModelHandlerTensor model handler.

  gen_fn = make_tensor_model_fn('generate')

  model_handler = PytorchModelHandlerTensor(
      state_dict_path=args.model_state_dict_path,
      model_class=T5ForConditionalGeneration,
      model_params={"config": AutoConfig.from_pretrained(args.model_name)},
      device="cpu",
      inference_fn=gen_fn,
      large_model=True)

A ModelHandler requires parameters like: