As deep learning models continue to grow in size and complexity, training them efficiently becomes a significant challenge. DeepSpeed, a deep learning optimization library, addresses this challenge with its powerful pipeline parallelism feature. This article explores how pipeline parallelism, implemented in DeepSpeed, enhances both memory and computational efficiency, enabling the training of models with trillions of parameters.
Pipeline parallelism is a technique that partitions the layers of a deep learning model into sequential stages. Each stage can then be processed in parallel across different devices (typically GPUs). This approach contrasts with data parallelism, where the entire model is replicated on each device, and each device processes a different subset of the data.
By dividing the model, pipeline parallelism reduces the memory footprint on each device, allowing for the training of significantly larger models. It also improves computational efficiency by enabling concurrent processing of different stages of the model.
DeepSpeed's training engine offers a hybrid approach, combining data parallelism and pipeline parallelism. Furthermore, it can be integrated with other model parallelism techniques, such as Megatron-LM, to achieve even greater scalability. This combination is often referred to as 3D parallelism.
The core idea behind DeepSpeed's pipeline parallelism is to divide each batch of training data into smaller units called micro-batches. These micro-batches are then processed sequentially through the pipeline stages.
Here’s a breakdown of the process:
DeepSpeed simplifies the implementation of pipeline parallelism. Here's a starting point using torchvision's AlexNet model:
Pipeline parallelism requires models to be structured as a sequence of layers. While a traditional forward()
function isn't explicitly defined for the entire pipeline, the implicit flow is as follows:
def forward(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x
PyTorch's torch.nn.Sequential
is ideal for expressing such models and can be directly parallelized using DeepSpeed:
import torch.nn as nn
from deepspeed.pipe import PipelineModule
net = nn.Sequential(
nn.Linear(in_features, hidden_dim),
nn.ReLU(inplace=True),
nn.Linear(hidden_dim, out_features)
)
net = PipelineModule(layers=net, num_stages=2)
The PipelineModule
divides the model into specified stages and distributes the layers across the corresponding GPUs. Remember that the total number of GPUs should be divisible by the number of pipeline stages.
To adapt an existing model like AlexNet for pipeline parallelism, you might need to flatten its submodules into a single sequence of layers:
import torch
import torch.nn as nn
from deepspeed.pipe import PipelineModule
class AlexNetPipe(nn.Module):
def __init__(self, num_classes=1000):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def to_layers(self):
layers = [
*self.features,
self.avgpool,
lambda x: torch.flatten(x, 1), # Example of a non-nn.Module layer
*self.classifier
]
return layers
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
Ensure that the inputs and outputs of each layer are either a single torch.Tensor
or a tuple of tensors. You might need to adjust the forward()
pass of some models to pack and unpack arguments. This adjustment is particularly relevant when dealing with models using multiple inputs, which might be the case utilizing Mixture-of-Experts (MoE).
Use DeepSpeed's train_batch()
method to advance the pipeline engine, consuming training data and updating model weights:
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
DeepSpeed provides utilities for distributed data loading. It expects data loaders to return a tuple of two items: the input batch data and the data for loss calculation. For convenience, DeepSpeed's pipeline can construct a data loader when a dataset is provided during initialization. Check out the Data Efficiency tutorial for more info.
engine, _, _, _ = deepspeed.initialize(
args=args,
model=net,
model_parameters=[p for p in net.parameters() if p.requires_grad],
training_data=cifar_trainset()
)
for step in range(args.steps):
loss = engine.train_batch()
To prevent data stream from emptying, wrap your data loader with deepspeed.utils.RepeatingLoader
.
Achieving optimal pipeline parallel training requires careful load balancing across devices. DeepSpeed offers several partitioning methods via the partition_method
argument:
"parameters"
: Balances the number of trainable parameters on each stage (default)."type:[regex]"
: Balances layers based on class name matching a regular expression."uniform"
: Balances the number of layers per stage.For very large models, replicating the entire model in CPU memory on each worker can become a bottleneck. DeepSpeed's LayerSpec
class delays module construction until layers are partitioned, reducing the memory footprint.
from deepspeed.pipe import PipelineModule, LayerSpec
import torch.nn as nn
class AlexNetPipe(PipelineModule):
def __init__(self, num_classes=10, **kwargs):
self.num_classes = num_classes
specs = [
LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
LayerSpec(nn.ReLU, inplace=True),
LayerSpec(nn.MaxPool2d, kernel_size=3, stride=2), # add the missing layer
LayerSpec(nn.AdaptiveAvgPool2d, (6, 6)),
LayerSpec(nn.Flatten, start_dim=1),
LayerSpec(nn.Linear, 4096, self.num_classes),
]
super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)
For models with reused layers (e.g., shared embedding layers in transformers), DeepSpeed provides TiedLayerSpec
. It requires a key
argument to identify where a layer is reused. Tied layers are replicated on all stages using them, with an all-reduce operation ensuring weight synchronization.
Pipeline parallelism in DeepSpeed offers a robust solution for training large deep learning models efficiently. By partitioning models into stages and processing them in parallel, it reduces memory requirements and improves computational performance. DeepSpeed's flexible implementation, combined with techniques like load balancing and memory-efficient model construction, empowers researchers and practitioners to tackle increasingly complex AI challenges. Furthermore, by using tools like the Flops Profiler you can ensure your implemented network is running as expected.