Skip to main content
Version: Latest

LLM offline batch inference with Ray Data and vLLM

Try it out

Run this example in the Anyscale Console or view it on GitHub.

⏱️ Time to complete: 10 min

This template shows you how to:

  1. Read in data from in-memory samples or files on cloud storage.
  2. Use Ray Data and vLLM to run batch inference of a LLM.
  3. Write the inference outputs to cloud storage.

For a Python script version of the code in this workspace template, refer to main.py.

Note: This tutorial is run within a workspace. Review the Introduction to Workspaces template before this tutorial.

How to decide between online vs offline inference for LLM

Online LLM inference (e.g. Anyscale Endpoint) should be used when you want to get real-time response for prompt or to interact with the LLM. Use online inference when you want to optimize latency of inference to be as quick as possible.

On the other hand, offline LLM inference (also referred to as batch inference) should be used when you want to get reponses for a large number of prompts within some time frame, but not required to be real-time (minutes to hours granularity). Use offline inference when you want to:

  1. Scale your workload to large-scale datasets
  2. Optimize inference throughput and resource usage (for example, maximizing GPU utilization).

In this tutorial, we will focus on the latter, using offline LLM inference for a sentence completion task.

Step 1: Set up model configs

First, import the dependencies used in this template.

import os
from typing import Dict

import numpy as np
import ray
from vllm import LLM, SamplingParams

from util.utils import (
generate_output_path,
get_a10g_or_equivalent_accelerator_type,
prompt_for_hugging_face_token,
)

Set up values that will be used in the batch inference workflow:

# Set to the name of the Hugging Face model that you wish to use from the preceding list.
# Note that using the Llama models will prompt you to set your Hugging Face user token.
HF_MODEL = "mistralai/Mistral-7B-Instruct-v0.1"

# Create a sampling params object.
sampling_params = SamplingParams(n=1, temperature=0, max_tokens=2048, stop=["<|eot_id|>", "<|end_of_text|>"])

# Output path to write output result. You can also change this to any cloud storage path,
# e.g. a specific S3 bucket.
output_path = generate_output_path(
# `ANYSCALE_ARTIFACT_STORAGE` is the URI to the pre-generated folder for storing
# your artifacts while keeping them separate them from Anyscale-generated ones.
# See: https://docs.anyscale.com/workspaces/storage#object-storage-s3-or-gcs-buckets
os.environ.get("ANYSCALE_ARTIFACT_STORAGE"),
HF_MODEL,
)

Some models will require you to input your Hugging Face user access token. This will be used to authenticate/download the model and is required for official LLaMA, Mistral, and Gemma models. You can use one of the other models which don't require a token if you don't have access to this model (for example, mlabonne/NeuralHermes-2.5-Mistral-7B).

Run the following cell to start the authentication flow. A VS Code overlay will appear and prompt you to enter your Hugging Face token if your selected model requires authentication. The token will be cached to a file in the workspace so it can be used to launch an Anyscale Job later without needing to re-authenticate.

# Prompts the user for Hugging Face token if required by the model.
HF_TOKEN = prompt_for_hugging_face_token(HF_MODEL)

Start up Ray, using the Hugging Face token as an environment variable so that it's made available to all nodes in the cluster.

if ray.is_initialized():
ray.shutdown()
ray.init(
runtime_env={
"env_vars": {"HF_TOKEN": HF_TOKEN},
}
)

Step 2: Read input data with Ray Data

Use Ray Data to read in your input data from some sample prompts.

# Create some sample sentences, and use Ray Data to create a dataset for it.
prompts = [
"I always wanted to be a ...",
"The best way to learn a new language is ...",
"The biggest challenge facing our society today is ...",
"One thing I would change about my past is ...",
"The key to a happy life is ...",
]
ds = ray.data.from_items(prompts)

# View one row of the Dataset.
ds.take(1)

Construct the input prompts for your model using the format required by the specific model. Run the cell below to apply this prompt construction to each row in the Dataset with Ray Data's map method.

model_name_to_input_prompt_format = {
"meta-llama/Llama-2-7b-chat-hf": "[INST] {} [/INST]",
"mistralai/Mistral-7B-Instruct-v0.1": "[INST] {} [/INST]",
"google/gemma-7b-it": "<start_of_turn>model\n{}<end_of_turn>\n",
"mlabonne/NeuralHermes-2.5-Mistral-7B": "<|im_start|>system\nYou are a helpful assistant that will complete the sentence in the given input prompt.<|im_end|>\n<|im_start|>user{}<|im_end|>\n<|im_start|>assistant",
"meta-llama/Meta-Llama-3-8B-Instruct": (
"<|start_header_id|>system<|end_header_id|>\n\nYou are a helpful assistant. Complete the given prompt in several concise sentences.<|eot_id|>\n"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>\n"
"<|start_header_id|>assistant<|end_header_id|>\n\n"
),
}

def construct_input_prompt(row, text_column):
"""Given the input row with raw text in `text_column` column,
construct the input prompt for the model."""
prompt_format = model_name_to_input_prompt_format.get(HF_MODEL)
if prompt_format:
row[text_column] = prompt_format.format(row[text_column])
return row

ds = ds.map(construct_input_prompt, fn_kwargs={"text_column": "item"})

So far, we have defined two operations of the Dataset (from_items(), map()), but have not executed the Dataset yet and don't see any results. Why is that?

Ray Data uses lazy, streaming execution by default, which means that:

  • Datasets and any associated transformations are not executed until you call a consuming operation such as ds.take(), ds.take_all(), ds.iter_batches(), or Dataset.write_parquet().
  • The entire Dataset is not stored in memory, but rather, the Dataset is executed incrementally on parts of data while overlapping execution of various operations in the Dataset. This allows Ray Data to execute batch transformations without needing to load the entire dataset into memory and overlap data preprocessing and model training steps during ML training.

We will trigger Dataset execution after the next step, which is applying the vLLM model to the formatted input prompts.

Step 3: Run Batch Inference with vLLM

Create a class to define batch inference logic.

# Mapping of model name to max_model_len supported by model.
model_name_to_args = {
"mistralai/Mistral-7B-Instruct-v0.1": {"max_model_len": 16832},
"google/gemma-7b-it": {"max_model_len": 2432},
"mlabonne/NeuralHermes-2.5-Mistral-7B": {"max_model_len": 16800},
}

class LLMPredictor:
def __init__(self, text_column):
# Name of column containing the input text.
self.text_column = text_column

# Create an LLM.
self.llm = LLM(
model=HF_MODEL,
**model_name_to_args.get(HF_MODEL, {}),
# Note: add additional args to LLM constructor below.
)

def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Generate texts from the prompts.
# The output is a list of RequestOutput objects that contain the prompt,
# generated text, and other information.
outputs = self.llm.generate(batch[self.text_column], sampling_params)
prompt = []
generated_text = []
for output in outputs:
prompt.append(output.prompt)
generated_text.append(' '.join([o.text for o in output.outputs]))
return {
"prompt": prompt,
"generated_text": generated_text,
}

Scaling with GPUs

Next, apply batch inference for all input data with the Ray Data map_batches method. When using vLLM, LLM instances require GPUs; here, we will demonstrate how to configure Ray Data to scale the number of LLM instances and GPUs needed.

To use GPUs for inference in the Workspace, we can specify num_gpus and concurrency in the ds.map_batches() call below to indicate the number of LLM instances and the number of GPUs per LLM instance, respectively. For example, if we want to use 4 LLM instances, with each requiring 1 GPU, we would set concurrency=4 and num_gpus=1, requiring 4 total GPUs.

Smaller models, such as Meta-Llama-3-8B-Instruct and Mistral-7B-Instruct-v0.1, typically require 1 GPU per instance. Larger models, such as Mixtral-8x7B-Instruct-v0.1 and meta-llama/Meta-Llama-3-70B-Instruct, typically require multiple GPUs per instance. You should configure these parameters according to the compute needed by the model.

ds = ds.map_batches(
LLMPredictor,
# Set the concurrency to the number of LLM instances.
concurrency=4,
# Specify the number of GPUs required per LLM instance.
num_gpus=1,
# Specify the batch size for inference. Set the batch size to as large as possible without running out of memory.
# If you encounter out-of-memory errors, decreasing batch_size may help.
batch_size=5,
# Pass keyword arguments for the LLMPredictor class.
fn_constructor_kwargs={"text_column": "item"},
# Select the accelerator type; A10G or L4.
accelerator_type=get_a10g_or_equivalent_accelerator_type(),
)

Finally, make sure to either enable Auto-select worker nodes or configure your workspace cluster to have the appropriate GPU worker nodes (A10G or L4):

Run the following cell to start dataset execution and view the results!

ds.take_all()

Scaling to a larger dataset

In the example above, we performed batch inference for Ray Dataset with 5 example prompts. Next, let's explore how to scale to a larger dataset based on files stored in cloud storage.

Run the following cell to create a Dataset from a text file stored on S3. This Dataset has 100 rows, with each row containing a single prompt in the text column.

ds = ray.data.read_text("s3://anonymous@air-example-data/prompts_100.txt")
ds.take(1)

Customizing your LLM instance

If you wish to further customize vLLM, you can modify the LLMPredictor class defined earlier in Step 3 as follows:

  • Add kwargs for initializing the LLM object in LLMPredictor.__init__() as indicated by the comment, in Step 3.
  • Modify the SamplingParams object defined earlier in the notebook, in Step 1.
  • For a more advanced usage case of using a different SamplingParams for each LLM.generate() call, follow these steps:
    1. Add a new argument to LLMPredictor.__call__(), which takes a function that returns a SamplingParams object to be used for the subsequent LLM.generate() call.
    2. This function should be passed to LLMPredictor in the fn_constructor_kwargs argument of the map_batches() call in the next section.
    3. Finally, in LLMPredictor.__call__(), call this function, and pass the generated SamplingParams object to LLM.generate().

Similar to before, we apply batch inference for all input data with the Ray Data map_batches method.

ds = ds.map(construct_input_prompt, fn_kwargs={"text_column": "text"})
ds = ds.map_batches(
LLMPredictor,
# Set the concurrency to the number of LLM instances.
concurrency=4,
# Specify the number of GPUs required per LLM instance.
num_gpus=1,
# Specify the batch size for inference. Set the batch size to as large possible without running out of memory.
# If you encounter CUDA out-of-memory errors, decreasing batch_size may help.
batch_size=5,
# Pass keyword arguments for the LLMPredictor class.
fn_constructor_kwargs={"text_column": "text"},
# Select the accelerator type; A10G or L4.
accelerator_type=get_a10g_or_equivalent_accelerator_type(),
)

Output Results

Finally, write the inference output data out to Parquet files on S3.

Running the following cell will trigger execution for the full Dataset, which will execute all of the operations (read_text(), map_batches(LLMPredictor), write_parquet()) at once:

ds.write_parquet(output_path, try_create_dir=False)
print(f"Batch inference result is written into {output_path}.")

Monitoring Dataset execution

We can use the Ray Dashboard to monitor the Dataset execution. In the Ray Dashboard tab, navigate to the Job page and open the "Ray Data Overview" section. Click on the link for the running job, and open the "Ray Data Overview" section to view the details of the batch inference execution:

Handling GPU out-of-memory failures

If you run into CUDA out of memory, your batch size is likely too large. Decrease the batch size as described above.

If your batch size is already set to 1, then use either a smaller model or GPU devices with more memory.

Reading back results

We can also use Ray Data to read back the output files to ensure the results are as expected.

ds_output = ray.data.read_parquet(output_path)
ds_output.take(5)

Submitting to Anyscale Jobs

The script in main.py has the same code as this notebook; you can use anyscale job submit to submit the app in that file to Anyscale Jobs. Refer to Introduction to Jobs for more details.

After modifying the configurations at the top of main.py (model name, input/output path, input text column), run the following cell to submit a job:

!anyscale job submit -- python main.py

Summary

This notebook:

  • Read in data from in-memory samples or input files from cloud storage.
  • Used Ray Data and vLLM to run offline batch inference of a LLM.
  • Wrote the inference outputs to cloud storage and read back the results.