PyTorch多卡分布式训练:DistributedDataParallel (DDP) 简要分析

2021 年 11 月 4 日 PaperWeekly


©作者 | 伟大是熬出来的

单位 | 同济大学

研究方向 | 机器阅读理解




前言

因为课题组发的卡还没有下来,先向导师问了实验室的两张卡借用。之前都是单卡训练模型,正好在这个机会实践以下单机多卡训练模型的方法。关于 DDP 网上有很多资料,但都比较零碎(有些博客的代码甚至没办法 run),Pytorch 给出的官方文档看起来也比较吃力。因此这篇文章的主要目的是梳理一下笔者学习过程中认为比较好的资料,用通俗的语言介绍一下 DDP 的原理,最后给出使用 DDP 的模板以及一份详细的运行案例。

当代研究生应当掌握的并行训练方法(单机多卡):
https://zhuanlan.zhihu.com/p/98535650

这篇文章中介绍了目前常用的并行训练方法。其中, nn.Dataparallel  的使用最简单,只需要使用 Dataparallel  包装模型,再设置一些参数就可以实现。参数中需要指定参与训练的 GPU,device_ids=gpus;汇总梯度的 GPU,output_device=gpus[0]。

model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])

nn.Dataparallel  方法实际上是使用单进程将模型和数据加载到多个 GPU 上,控制数据在 GPU 之间流动,协同不同的 GPU 上的模型进行并行训练。这篇文章 [8] 中提到 nn.Dataparallel  方法的弊端:在训练的过程中,每个 batch 的模型权重是在一个进程上计算出来之后,再分发到每个 GPU 上。

这会导致负载不均衡的问题,可能第一个 GPU(12GB)占用了 10GB,剩余 GPU 却只使用了 4GB。因为在数据并行的时候,loss 会在第一个 GPU 上相加计算,更新好以后把权重分发到其余卡。这就造成了第一个 GPU 的负载远大于其他显卡。



nn.DistributedDataParallel 原理


nn.Dataparallel  使用单进程控制多个 GPU 不同, nn.DistributedDataParallel 为每个 GPU 都创建一个进程。这些 GPU 可以位于同一个结点上(单机多卡),也可以分布在多个节点上(多机多卡)。每个进程都执行相同的任务,每个进程都与其他进程进行通信。

另外一点不同是, 只有梯度会在进程(GPU)之间传播 。以单机多卡举例,假设我们有三张卡并行训练,那么在每个 epoch 中,数据集会被划分成三份给三个 GPU,每个 GPU 使用自己的 minibatch 数据做自己的前向计算,然后梯度在 GPU 之间全部约简。在反向传播结束的时候,每个 GPU 都有平均的梯度,确保模型权值保持同步(synchronized)。



运行模板


这一小节以官方文档给出的 demo 作为例子,介绍 DDP 的使用模板以及运行流程:
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

首先我们导入库文件,其中 torch.multiprocessing  用于创建进程,后面会详细介绍。

   
   
     
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

前文提到,DDP 模型对于每个 GPU 都会创建一个单独的进程管理。在程序并发执行的过程中,进程之间需要同步和通信。因此我们需要一个方法管理进程组,这个方法需要知道如何找到进程 0。也要知道进程组中同步了多少个进程。init_process_group 方法能够实现上述功能,其中参数的含义解释如下:

  • backend:使用的后端。包括 mpi , gloo , 和 nccl 。根据官方文档的介绍,nccl 是运行速度最快的,因此大多设置为这个
  • rank:当前进程的等级。在 DDP 管理的进程组中,每个独立的进程需要知道自己在所有进程中的阶序,我们称为 rank
  • world_size:在 DDP 管理的进程组中,每个独立的进程还需要知道进程组中管理进程的数量,我们称为 world_size


下面 setup  以及  cleanup  分别实现了进程组的设置以及销毁。

   
   
     
ef setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

本例中我们训练一个简单的网络结构  ToyModel

   
   
     
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(1010)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(105)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

下面是模型训练部分的模板。数据和模型加载到当前进程使用的 GPU 中,正常进行正反向传播,需要注意以下几点:


  • 每个进程都需要复制一份模型以及数据。我们需要根据前文提到的  rank  和  world_size  两个参数初始化进程组。这样进程之间才能相互通信。使用我们前文定义的  setup()  方法实现;
  • model = ToyModel().to(rank)  这条语句将我们的模型移动到对应的 GPU中,  rank  参数作为进程之间的阶序,可以理解为当前进程 index。由于每个进程都管理自己的 GPU,因此通过阶序可以索引到对应的 GPU;
  • ddp_model = DDP(model, device_ids=[rank])这条语句包装了我们的模型;

  • 其他与 pytorch 中训练模型的模板相同,最后一点需要注意的是,在我们将 tensor 移动到 GPU 的时候,同样需要使用 rank 索引,代码中体现在第 14 行。

   
   
     

def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(2010))
    labels = torch.randn(205).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

最后是启动器的介绍。DDP 的启动有两种方式,分别对应不同的代码。

  • torch.distributed.launch 启动器,用于在命令行分布式地执行 python 文件。在执行过程中,启动器会将当前进程的(其实就是 GPU 的)index 通过参数传递给 python。在使用的时候执行语句 CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py  调用启动器 torch.distributed.launch。
  • 本例中使用的是第二种方法 torch.multiprocessing.spawn。使用时,只需要调用 torch.multiprocessing.spawn,torch.multiprocessing 就会帮助我们自动创建进程。


如下面的代码所示,spawn 开启了 world_size 个进程,每个进程执行 demo_fn 并向其中传入 local_rank(当前进程 index)作为参数。这里需要结合前文 demo_basic 的定义来看。args 中的 world_size 对应 demo_basic  的 world_size 参数;mp.spawn 中 nprocs 则是创建进程的数量;至于 demo_basic  中的 rank 参数,应当是框架内部实现了索引机制因此不需要我们显示对应(笔者自己的理解)。

   
   
     
def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

完整代码如下:

   
   
     
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(1010)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(105)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(2010))
    labels = torch.randn(205).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)

这份代码可以直接运行,输入结果如下,笔者使用两张卡,因此对应的 rank 分别是 0 和 1:

Running basic DDP  example on Rank 1
Running basic DDP  example on Rank 0

在官网给的这份 demo 之外,其实还有一点需要注意。我们使用 pytorch 处理数据集创建 dataloader 的过程中,需要使用 DistributedSampler 采样器。我们已经知道,每个进程都会拷贝一份模型和数据的副本,但是在并行计算的过程中,单个进程只会处理自己的 minibatch 的数据。

假设我们使用五个 GPU 并行,其对应的五个进程都有模型和数据的副本,我们假设训练集有一万条数据,那么在单个 epoch 中每个进程实际上只需要使用两千条数据训练,之后进行梯度整合。那么进程如何知道自己需要处理哪些数据呢?这是 DistributedSampler 的功能。

关于 DistributedSampler 具体做了什么,可以参考文献 7。

在此基础上,笔者根据官网的 demo 总结了一份使用 DDP 进行多卡并行加速模型的模板,读者在使用过程中根据需要进行简单更改即可使用:

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def run(demo_fn, world_size):
    setup(rank, world_size)

    torch.manual_seed(18)
    torch.cuda.manual_seed_all(18)
    torch.backends.cudnn.deterministic = True
    torch.cuda.set_device(rank) # 这里设置 device ,后面可以直接使用 data.cuda(),否则需要指定 rank


    train_dataset = ...
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

    model = ...
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    optimizer = optim.SGD(model.parameters())

    for epoch in range(100):
       train_sampler.set_epoch(epoch)
       for batch_idx, (data, target) in enumerate(train_loader):
          data = data.cuda()
          target = target.cuda()
          ...
          output = model(images)
          loss = criterion(output, target)
          ...
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()



if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus

    mp.spawn(run,
             args=(world_size,),
             nprocs=world_size,
             join=True)

总结一下,使用 DDP 进行多卡并行加速模型的重点:

  • init_process_group 函数管理进程组
  • 在创建 Dataloader 的过程中,需要使用 DistributedSampler 采样器
  • 正反向传播之前需要将数据以及模型移动到对应 GPU,通过参数 rank 进行索引,还要将模型使用 DistributedDataParallel 进行包装
  • 在每个 epoch 开始之前,需要使用 train_sampler.set_epoch(epoch) 为 train_sampler 指定 epoch,这样做可以使每个 epoch 划分给不同进程的  minibatch 不同,从而在整个训练过程中,不同的进程有机会接触到更多的训练数据
  • 使用启动器进行启动。不同启动器对应不同的代码。torch.distributed.launch 通过命令行的方法执行,torch.multiprocessing.spawn 则可以直接运行程序。


最后,笔者使用上述模板实现了一份基于 Roberta 的文本分类任务,使用 DDP 进行单机双卡并行加速,运行的结果如下,有感兴趣的读者再放代码吧,这里展示一下运行结果:




参考文献

注:下述文献大多使用 torch.distributed.launch 启动器执行程序

[1] 当代研究生应当掌握的并行训练方法(单机多卡):

https://zhuanlan.zhihu.com/p/98535650

[2] PyTorch Parallel Training(单机多卡并行、混合精度、同步BN训练指南文档):https://zhuanlan.zhihu.com/p/145427849

[3] pytorch多卡分布式训练简要分析:https://zhuanlan.zhihu.com/p/159404316

[4] Pytorch中的Distributed Data Parallel与混合精度训练(Apex):https://zhuanlan.zhihu.com/p/105755472

[5] PyTorch分布式训练基础--DDP使用:https://zhuanlan.zhihu.com/p/358974461

[6] 使用PyTorch编写分布式应用程序:https://www.jianshu.com/p/be9f8b90a1b8?utm_campaign=hugo&utm_medium=reader_share&utm_content=note&utm_source=weixin-friends

[7] DistributedSampler 具体做了什么:https://blog.csdn.net/searobbers_duck/article/details/115299691?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link

[8] Pytorch的nn.DataParallel:https://zhuanlan.zhihu.com/p/102697821



更多阅读




#投 稿 通 道#

 让你的文字被更多人看到 



如何才能让更多的优质内容以更短路径到达读者群体,缩短读者寻找优质内容的成本呢?答案就是:你不认识的人。


总有一些你不认识的人,知道你想知道的东西。PaperWeekly 或许可以成为一座桥梁,促使不同背景、不同方向的学者和学术灵感相互碰撞,迸发出更多的可能性。 


PaperWeekly 鼓励高校实验室或个人,在我们的平台上分享各类优质内容,可以是最新论文解读,也可以是学术热点剖析科研心得竞赛经验讲解等。我们的目的只有一个,让知识真正流动起来。


📝 稿件基本要求:

• 文章确系个人原创作品,未曾在公开渠道发表,如为其他平台已发表或待发表的文章,请明确标注 

• 稿件建议以 markdown 格式撰写,文中配图以附件形式发送,要求图片清晰,无版权问题

• PaperWeekly 尊重原作者署名权,并将为每篇被采纳的原创首发稿件,提供业内具有竞争力稿酬,具体依据文章阅读量和文章质量阶梯制结算


📬 投稿通道:

• 投稿邮箱:hr@paperweekly.site 

• 来稿请备注即时联系方式(微信),以便我们在稿件选用的第一时间联系作者

• 您也可以直接添加小编微信(pwbot02)快速投稿,备注:姓名-投稿


△长按添加PaperWeekly小编




🔍


现在,在「知乎」也能找到我们了

进入知乎首页搜索「PaperWeekly」

点击「关注」订阅我们的专栏吧



·

登录查看更多
1

相关内容

30家国产存储器及主控芯片厂商调研分析报告
专知会员服务
20+阅读 · 2022年3月19日
专知会员服务
51+阅读 · 2021年6月17日
最新LightGBM进展介绍报告,39页ppt
专知会员服务
29+阅读 · 2021年1月15日
Python分布式计算,171页pdf,Distributed Computing with Python
专知会员服务
105+阅读 · 2020年5月3日
深度神经网络模型压缩与加速综述
专知会员服务
126+阅读 · 2019年10月12日
实践教程 | PyTorch分布式测试踩坑小结
极市平台
18+阅读 · 2022年4月1日
实践教程 | 浅谈 PyTorch 中的 tensor 及使用
极市平台
1+阅读 · 2021年12月14日
正则化技巧:标签平滑以及在 PyTorch 中的实现
极市平台
2+阅读 · 2021年12月10日
Pytorch 数据流中常见Trick总结
极市平台
0+阅读 · 2021年12月7日
实践教程|PyTorch训练加速技巧
极市平台
0+阅读 · 2021年11月15日
国家自然科学基金
0+阅读 · 2014年12月31日
国家自然科学基金
0+阅读 · 2014年12月31日
国家自然科学基金
15+阅读 · 2013年12月31日
国家自然科学基金
2+阅读 · 2013年12月31日
国家自然科学基金
2+阅读 · 2013年12月31日
国家自然科学基金
1+阅读 · 2013年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
1+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2008年12月31日
Arxiv
18+阅读 · 2020年7月13日
VIP会员
相关资讯
实践教程 | PyTorch分布式测试踩坑小结
极市平台
18+阅读 · 2022年4月1日
实践教程 | 浅谈 PyTorch 中的 tensor 及使用
极市平台
1+阅读 · 2021年12月14日
正则化技巧:标签平滑以及在 PyTorch 中的实现
极市平台
2+阅读 · 2021年12月10日
Pytorch 数据流中常见Trick总结
极市平台
0+阅读 · 2021年12月7日
实践教程|PyTorch训练加速技巧
极市平台
0+阅读 · 2021年11月15日
相关基金
国家自然科学基金
0+阅读 · 2014年12月31日
国家自然科学基金
0+阅读 · 2014年12月31日
国家自然科学基金
15+阅读 · 2013年12月31日
国家自然科学基金
2+阅读 · 2013年12月31日
国家自然科学基金
2+阅读 · 2013年12月31日
国家自然科学基金
1+阅读 · 2013年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
1+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2008年12月31日
Top
微信扫码咨询专知VIP会员