LLM Inference: Data Parallel, Model Parallel, and Pipeline Parallel
Author(s): Tushar Vatsa
Originally published on Towards AI.

In the previous post, we explored how KV cache optimization affects inference performance. Using the Phi-2 model as an example, we observed that increasing the sequence length led to a near-linear decline in tokens-per-second throughput.
In this post, we’ll explore how data parallelism, model parallelism, and pipeline parallelism operate within an inference engine examining their effects on memory usage, throughput, and overall performance trade-offs.
Data Parallel
In data parallelism, we distribute the dataset across multiple devices while keeping a full copy of the model on each. This approach works well when the model comfortably fits into the memory of a single device. By processing different data batches in parallel, we can ideally speed up inference by the number of available devices. However, this strategy becomes impractical for very large models that cannot fit entirely on a single GPU.

When working with multiple devices, the dataset needs to be divided evenly across them. For example, if we have 100 datapoints and 2 GPUs, an ideal strategy is to split the data in a 50/50 ratio. To ensure randomness and avoid bias, we typically shuffle the indices first, and then partition them equally across the GPUs. Let’s see how to do it.
#Dataset file : dataset.py
import torch
from random import Random
import torch.distributed as dist
from torch.utils.data import DataLoader
class Partition(): #Standard pytorch dataset class(containing len and getitem methods)
def __init__(self, data, index):
self.data = data #Entire Dataset(list)
self.index = index #Indices(list) : Different for each device
def __len__(self):
return len(self.index) #Partition represents the chunk of data(not entire data)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner():
def __init__(self, data, sizes=[0.5, 0.5], seed=1234):
self.data = data
self.partitions = partitions
rng = Random()
rng.seed(seed)
#Get the length of the entire dataset
data_len = len(data)
indices = list(range(data_len))
#Shuffle the indices
rng.shuffle(indices)
#Partition the indices for the devices
start_idx = 0
for size in sizes:
part_len = int(size * data_len)
self.partitions.append(indices[start_idx:start_idx + part_len])
start_idx += part_len
def use(self, partition):
return Partition(self.data, self.partitions[partition]) #Dataset, List of Indices
def partition_dataset(rank, world_size, dataset, batch_size=128, collate_fn=None):
partitioned_batch_size = batch_size // world_size
sizes = [1/ world_size for _ in range(world_size)] #world_size : The number of devices
partitioner = DataPartitioner(dataset, sizes=sizes)
partition = partitioner.use(rank)
#Wrap this in a dataloader
dataloader = DataLoader(
partition,
batch_size=partitioned_batch_size,
collate_fn=collate_fn
)
return dataloader
Pretty Intuitive, right?
If you’re wondering about world_size and rank, here's a simple explanation:
world_sizerefers to the total number of devices (e.g., GPUs) participating in the training process.rankidentifies the specific device among them.
For example, if you have 4 GPUs, world_size is set to 4, and rank can take values from 0 to 3, representing each GPU.
Training a model at scale requires two essential components: multiple concurrent processes(torch.multiprocessing) and efficient communication between devices(torch.distributed)
Question : Why do you think we need the communication? Think about it.
Now, the distributed data loader is ready. Let’s write the code for the training process.
#imports
import tqdm
import torch
import dataset #The dataloader that we wrote
import numpy as np
import torch.nn as nn
from functools import partial
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch.multiprocessing import Process
from transformers import AutoConfig, GPT2LMHeadModel
from utils import get_tokenizer, collate_batch
#You can write your own tokenizer, train and generate functions
def average_gradients(model):
world_size = dist.get_world_size()
for param in model.parameters():
if param.grad is not None:
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM) #Communication overhead across gpus
param.grad.data /= world_size
def train(model, optimizer, examples, batch_size, collate_fn, desc, rank=0, average_gradients_fn=None):
model.train()
tokens_per_sec = []
tokens_num = []
for i, batch in enumerate(prog_bar := tqdm.tqdm(examples, desc=f'Training ({desc})')):
t0 = time.time()
optimizer.zero_grad()
logits = model(input_ids=batch['input_ids']).logits
loss = torch.nn.functional.cross_entropy(
input=logits.reshape((-1, logits.shape[-1])),
target=batch['labels'].reshape(-1),
reduction='none')
loss = (torch.sum(loss * batch['label_token_weights'].reshape(-1)) /
torch.sum(batch['label_token_weights']))
loss.backward() #Calculates the gradients
if average_gradients_fn is not None:
average_gradients_fn(model)
optimizer.step() #Updates the weights
batch_time = time.time() - t0
tokens = np.prod(batch['input_ids'].shape)
tokens_per_sec.append(tokens / batch_time)
tokens_num.append(tokens)
prog_bar.set_postfix(
tokens_per_sec=tokens / batch_time,
loss=loss.item())
return np.mean(tokens_per_sec), tokens_num
def setup(rank, world_size, backend): #Sets up the communication between multiple devices(GPUs)
os.environ['MASTER_ADDRESS'] = 'localhost'
os.environ['MASTER_PORT'] = '33333'
dist.init_process_group(backend=backend, rank=rank, world_size=world_size)
#This function will be run by each process concurrently, the id for the process is 'rank'
def run_dp(rank, world_size, backend, dataset_name, model_max_length, n_epochs, batch_size, learning_rate):
setup(rank, world_size, backend)
config = AutoConfig.from_pretrained('gpt2')
model = GPT2LMHeadModel(config=config).to(rank) #This is great! We are loading it on each device, that's why rank!!!
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate) #Surprisingly, AdamW works great for LLMs
#We will use german(deutsch) to english translation dataset
dataset = {
split: datasets.load_dataset(dataset_name, split=split)['translation']
for split in ['train', 'validation', 'test']
}
src_key, tgt_key = 'de', 'en'
dataset['train'] = dataset['train'][:5000]
dataset['validation'] = dataset['validation'][:1000]
dataset['test'] = dataset['test'][:100]
#tokenization
tokenizer = get_tokenizer(examples=dataset['train'], vocab_size=config.vocab_size, src_key=src_key, tgt_key=tgt_key)
#collate function : partial pre-fills some of the arguments
collate_fn = partial(collate_batch, src_key=src_key, tgt_key=tgt_key, tokenizer=tokenizer, model_max_length=model_max_length, device=rank)
train_loader = partition_dataset(rank, world_size, dataset['train'], batch_size=batch_size, collate_fn=collate_fn)
val_loader = DataLoader(dataset["validation"], batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(dataset["test"], batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
total_time = []
total_tokens_per_sec = []
for epoch_idx in range(n_epochs):
start = time.time()
avg_tokens_per_sec, _ = train(
model=model,
optimizer=optimizer,
examples=train_loader,
batch_size=batch_size,
collate_fn=collate_fn,
desc=desc,
rank=rank,
average_gradients_fn=average_gradients)
end = time.time()
if __name__ == '__main__':
import torch.multiprocessing as mp
mp.set_start_method('spawn', force=True)
parser = argparse.ArgumentParser()
parser.add_argument('--pytest', type=bool, default=False)
parser.add_argument('--dataset', type=str)
parser.add_argument('--model_max_length', type=int, default=128)
parser.add_argument('--n_epochs', type=int, default=10)
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--learning_rate', type=float, default=1e-4)
parser.add_argument('--world_size', type=int, default=2)
args = parser.parse_args()
backend = 'nccl' #for cpu choose 'gloo'
for rank in range(world_size):
p = Process(
target=run_dp,
args=(rank, world_size, backend, args.dataset, args.model_max_length,
args.n_epochs, args.batch_size, args.learning_rate)
)
p.start()
processes.append(p)
# Wait for all processes to finish
for p in processes:
p.join()
Before updating the model weights, gradients from all devices are averaged to ensure consistency across replicas. This means at each step of training, every GPU has an identical copy of the model.

Let’s run a quick experiment. I’ve got access to 2 H100 (80GB) GPUs definitely overkill for a small model like GPT-2. But hey, that’s what I have on hand. As the saying goes, you don’t swat a mosquito with a bazooka but sometimes, that’s the only weapon in your arsenal. So let’s see what kind of training throughput and training time we can get out of this setup.

Voila! We’re seeing nearly 2× throughput(number of tokens processed during training), but that speed comes at the cost of doubling the hardware.

While we expected the average training time per epoch to be roughly halved compared to single-GPU training, repeated runs revealed a different story. Most epochs do show improved speed, but one or two consistently exhibit unusually high variance. This spike could be attributed to several overheads such as data loader latency while fetching the next batch, Python’s garbage collection kicking in, or synchronization delays from NCCL during inter-device communication.
Model Parallel
Model parallelism involves splitting the model itself across multiple devices. For example, if we have a model with 12 layers and 2 GPUs, we can place the first 6 layers on GPU 0 and the remaining 6 on GPU 1. Each device handles a portion of the forward and backward pass, allowing us to train models that wouldn’t otherwise fit on a single GPU.

In traditional model parallelism, execution is sequential meaning at any given moment, only one GPU is actively processing. So, even if you have 4 GPUs and split the model across them, only one GPU handles the computation at a time, while the others remain idle waiting for their turn. This leads to poor utilization and diminished returns as the number of devices increases.
import math
def get_device_map(n_layers, devices):
"""Returns a dictionary of layers distributed evenly across all devices."""
layers = list(range(n_layers))
n_blocks = int(math.ceil(n_layers / len(devices)))
layers_list = [layers[i : i + n_blocks] for i in range(0, n_layers, n_blocks)]
return dict(zip(devices, layers_list))
def parallelize(self, device_map=None):
'''
Distribute the model layers across the devices based on the device_map.
'''
self.device_map = (
get_device_map(len(self.h), range(torch.cuda.device_count())) if device_map is None else device_map
)
self.model_parallel = True
self.first_device = "cpu" if "cpu" in self.device_map.keys() else "cuda:" + str(min(self.device_map.keys()))
self.last_device = "cuda:" + str(max(self.device_map.keys()))
self.wte = self.wte.to(self.first_device)
self.wpe = self.wpe.to(self.first_device)
# Load onto devices
for k, v in self.device_map.items():
for block in v:
cuda_device = "cuda:" + str(k)
self.h[block] = self.h[block].to(cuda_device)
# ln_f to last
self.ln_f = self.ln_f.to(self.last_device)
For simplicity, let’s consider the case of GPT-2 :
There are 12 layers, and we distribute it across 2 GPUs.

wte, wpe → “cuda:0”
Iteration 1: k=0, v=[0,1,2,3,4,5]
└── self.h[0].to(“cuda:0”)
└── self.h[1].to(“cuda:0”)
└── self.h[2].to(“cuda:0”)
└── self.h[3].to(“cuda:0”)
└── self.h[4].to(“cuda:0”)
└── self.h[5].to(“cuda:0”)
Iteration 2: k=1, v=[6,7,8,9,10,11]
└── self.h[6].to(“cuda:1”)
└── self.h[7].to(“cuda:1”)
└── self.h[8].to(“cuda:1”)
└── self.h[9].to(“cuda:1”)
└── self.h[10].to(“cuda:1”)
└── self.h[11].to(“cuda:1”)
ln_f, lm_head → “cuda:1”
To take on a more challenging experiment beyond GPT-2, we tested LLaMA-2–7B : a 7 billion parameter model using model parallelism. We ran it across both 2 and 4 V100 (16GB) GPUs under identical configurations to evaluate performance and scalability.

Only one GPU is active at a time! The data flows through each GPU sequentially, like water through a pipe. Adding more GPUs doesn’t make the water flow faster it just splits the pipe into more sections. But it does enable training models too large for one gpu.
Pipeline Parallel
It’s a technique used to distribute a deep learning model across multiple GPUs by dividing the model into sequential stages and processing different parts of the input batch in parallel. Instead of executing the entire model on one GPU at a time, as is the case with traditional model parallelism, pipeline parallelism allows each GPU to handle a specific section of the model and operate concurrently on different micro-batches of the input.
Here’s how it works in practice: suppose you have a model with 32 layers and 4 GPUs available. You could split the model so that each GPU handles 8 layers. When a batch of input data arrives, it is further split into smaller chunks known as micro-batches. The first micro-batch begins processing on GPU 0 (which handles layers 0–7). As soon as it finishes that stage, it passes its intermediate activations to GPU 1 (handling layers 8–15), while GPU 0 immediately starts processing the second micro-batch. This pattern continues: GPU 2 starts working on the first micro-batch as soon as it receives it from GPU 1, and GPU 3 picks up the output from GPU 2, forming a continuous flow of data through the pipeline.
The result is that, after a brief “warm-up” period known as the pipeline bubble, all GPUs remain actively engaged, each working on a different micro-batch at any given time. This overlapping of execution leads to far better GPU utilization compared to model parallelism, where only one GPU is typically active at a time. It’s similar to how an assembly line works: each stage (or GPU) focuses on a specific task, and the system becomes efficient once the line is full.

There is a scheduler which figures out which GPU processes which micro-batch at each time step.
def _clock_cycles(num_batches: int, num_partitions: int) -> Iterable[List[Tuple[int, int]]]:
"""
Key insight: batch i is at partition j when: clock = i + j
"""
total_clocks = num_batches + num_partitions - 1 # Fill + Run + Drain
for clock in range(total_clocks):
schedule = []
for i in range(num_batches):
j = clock - i # Which partition is batch i at?
if 0 <= j < num_partitions: # Is it a valid partition?
schedule.append((i, j)) # (batch_idx, partition_idx)
yield schedule
For example : (3 batches, 3 partitions)
Clock 0: [(0,0)] → GPU 0 processes batch 0
Clock 1: [(1,0), (0,1)] → GPU 0 processes batch 1, GPU 1 processes batch 0
Clock 2: [(2,0), (1,1), (0,2)] → All 3 GPUs busy!
Clock 3: [(2,1), (1,2)] → GPU 1 processes batch 2, GPU 2 processes batch 1
Clock 4: [(2,2)] → GPU 2 processes batch 2
def forward(self, x):
# 1. SPLIT: Divide batch into micro-batches
batches = list(x.split(self.split_size, dim=0))
# 2. SCHEDULE: Generate the clock cycle schedule
schedules = _clock_cycles(num_batches, num_partitions)
# 3. EXECUTE: Process each clock cycle
for schedule in schedules:
self.compute(batches, schedule) # ← This runs GPUs in parallel!
# 4. COMBINE: Concatenate results
output = torch.cat(batches, dim=0)
return output.to(last_device)
def compute(self, batches, schedule):
# PHASE 1: Submit ALL tasks in parallel (non-blocking)
for batch_idx, partition_idx in schedule:
batch = batches[batch_idx].to(devices[partition_idx])
def compute_fn():
return partition(batch) # Run layers on this GPU
task = Task(compute_fn)
self.in_queues[partition_idx].put(task) # Send to worker thread
# PHASE 2: Collect ALL results
for batch_idx, partition_idx in schedule:
success, result = self.out_queues[partition_idx].get() # Wait for result
batches[batch_idx] = output # Store result for next stage
In case you are wondering, why can’t we just use processes like in data parallel?
Pipeline Parallelism needs tight, frequent coordination between GPUs at every clock cycle. Threads with shared memory queues provide this with minimal overhead, while separate processes would add too much communication latency.
Data parallelism, model parallelism, and pipeline parallelism each offer distinct ways to scale deep learning workloads across multiple GPUs but their trade-offs make them suitable for different scenarios.

Ultimately, there’s no one-size-fits-all strategy. If your model fits in memory, data parallelism is often the most straightforward. If you’re working with large models, model or pipeline parallelism becomes essential and combining strategies (like pipeline + data parallelism) can unlock even greater scalability. The key is understanding your model’s size, your hardware constraints, and the trade-offs each approach introduces in terms of memory, communication, and performance.
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.
Published via Towards AI
Take our 90+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!
Towards AI has published Building LLMs for Production—our 470+ page guide to mastering LLMs with practical projects and expert insights!

Discover Your Dream AI Career at Towards AI Jobs
Towards AI has built a jobs board tailored specifically to Machine Learning and Data Science Jobs and Skills. Our software searches for live AI jobs each hour, labels and categorises them and makes them easily searchable. Explore over 40,000 live jobs today with Towards AI Jobs!
Note: Content contains the views of the contributing authors and not Towards AI.