参考
pytorch官方文档
https://zhuanlan.zhihu.com/p/102697821
https://zhuanlan.zhihu.com/p/105755472
https://pytorch.org/tutorials/beginner/dist_overview.html#torch-nn-parallel-distributeddataparallel
https://pytorch.org/tutorials/intermediate/dist_tuto.html
https://github.com/pytorch/examples/tree/master/imagenet
单卡训练
pytorch使用GPU训练,这个比较基础了,贴一些官方的代码。
- 指定device
- 模型放入device
- 数据放入device ```python device = torch.device(“cuda:0”)
model.to(device)
这里执行的复制操作,会返回一个GPU的复制数据,而非重写my_tensor,所以需要给它命名
mytensor = my_tensor.to(device)
<a name="BJZhf"></a>## 多卡训练<a name="2aimA"></a>### torch.nn.DataParallel[torch.nn.DataParallel](https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html)是我之前常常使用的方式,它的优点是使用起来简单,缺点是GPU显存会非常不均衡。<br />之前训练的时候就有发现一个现象,0卡的显存特别特别大,其它卡的显存占用则很少。<br />这是因为torch.nn.DataParallel会并行input,loss却只会在第一块GPU上相加回传,这就造成了第一块GPU的负载会过高,且有网络通信问题。核心代码是`model = nn.DataParallel(model)`示例代码如下:```pythondevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")model = Model(input_size, output_size)if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUsmodel = nn.DataParallel(model)model.to(device)input = data.to(device)output = model(input)
探究一些细节
假设batch_size = 30
使用双卡
Let's use 2 GPUs!In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2])Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
使用三卡
Let's use 3 GPUs!In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])In Model: input size torch.Size([4, 5]) output size torch.Size([4, 2])In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
发现了什么?
是嘞,这种方法相当于将一个batch_size里的数据划分成多个部分分到各个卡上去。因此这种做法batch size应该大于gpu个数。概括来说,即DataParallel 会自动帮我们将数据切分load到相应GPU,将模型复制到相应GPU,进行正向传播计算计算loss,在第一张卡汇总loss并backward,权重算完后再分发到其它GPU上。
torch.nn.parallel.DistributedDataParallel
真正的分布式,一般叫DDP。官方推荐用这种方法替代multiprocessing或nn.DataParallel,更快且显存更均衡
讲DistributedDataParallel的官方文档比较短,因此先学习一下。
import torchimport torch.distributed as distimport torch.multiprocessing as mpimport torch.nn as nnimport torch.optim as optimfrom torch.nn.parallel import DistributedDataParallel as DDPdef example(rank, world_size):# create default process groupdist.init_process_group("gloo", rank=rank, world_size=world_size)# create local modelmodel = nn.Linear(10, 10).to(rank)# construct DDP modelddp_model = DDP(model, device_ids=[rank])# define loss function and optimizerloss_fn = nn.MSELoss()optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)# forward passoutputs = ddp_model(torch.randn(20, 10).to(rank))labels = torch.randn(20, 10).to(rank)# backward passloss_fn(outputs, labels).backward()# update parametersoptimizer.step()def main():world_size = 2mp.spawn(example,args=(world_size,),nprocs=world_size,join=True)if __name__=="__main__":main()
对代码的细节做一些解释。
一共有world_size个进程,通常每个进程控制一张卡来训练模型。每个进程需要有的操作包括(和正常训练流程不一样的标了红):
- 通信信息的初始化
- 在第rank张卡(或者说第rank进程中)创建模型
- 定义损失(如果损失函数用了模型,也需要放到第rank张卡上去)
- 定义优化函数
- 前向传播获得输出,放到第rank张卡上
- 正常的反向传播和梯度更新
进程创建:import torch.multiprocessing as mp
在DDP基础上用Multiprocessing可以在多个gpu之间复制模型,每个GPU由一个进程控制。每个进程执行相同的任务,与其它所有进程通信。
通信:dist.init_process_group
DDP依赖ProcessGroup实现通信,因此在使用DDP前需要创建ProcessGroup实例
构造过程
初始时,DDP会构造本地模型,将state_dict()从rank0的进程开始分发到各个进程中,保证各个进程从同一状态开始。
接着,DDP会创建本地进程Reducer,用来在反向传播中梯度同步。Reducer为了保证通信效率,会将所有梯度按buckets的形式组织,比如4个梯度,分成2个buckets,以bucket为单位同步信息,确定梯度。bucket size可以用参数bucket_cap_mb控制。梯度在backet中是按model.parameters()的倒序排列的,这样可以和反向传播的顺序保持一致。下图能帮助理解,2个进程,4个参数,2个buckets,param0和param1反而在bucket1中。等所有的bucket0都计算完毕,就调用allreduce去计算平均梯度。(这里有反向传播的知识点,后面会再具体讲)
DDP除了会创建Reducer,还会创建自动去计算梯度的hook,每个参数对应一个hook。当梯度的状态被标记为ready时就会唤起这些hook。![[pytorch]多卡分布式训练 - 图1](/uploads/projects/grace-gu2@cbfzvt/69894d9295b04701fd62a35e6740712d.jpeg)
前向传播过程
DDP获取输入并将它交给local model获取输出。
DDP的find_unused_parameters被设置为True时,会分析local model的输出,通过遍历模型输出中的autograd图将所有未使用的参数标记为Ready。反向传播过程中,Reducer需要等所有参数被标记为Ready后执行allreduce,因此有些等不到的参数应该直接将其标记为Ready(allreduce前面也提到哈,还是会在下一节反向传播的章节里讲)。 由于这种遍历会带来额外的开销,因此我们只应该在必要的时候设置为true。
反向传播过程
loss tensor会调用backward()函数, DDP控制不了tensor,因此上文中DDP才会创建自动去计算梯度的hook。当一个梯度被计算后,它对应的DDP hook就会被触发,DDP会将这个参数的梯度状态标记为Ready。当一个bucket中的所有梯度状态都是Ready之后,进程Reducer会启动异步进程(?可能是进程)allreduce去计算所有DDP进程中这个bucket中参数的平均梯度。当所有buckets的状态都是Ready后,Reducer会等待所有allreduce的计算结果。等allreduce全部计算完后,平均梯度将会写入所有参数的param.grad空间。等所有进程反向传播结束后,所有参数的grad空间应该保持一致。
优化过程
由于所有参数的grad空间保持一致,优化方法一致,所以优化结果也应该是一样的。
源码的简单解读
ProcessGroup
ProcessGroup.cpp实现DDP进程的通信,c10d的库提供了三种实现方法,分别是ProcessGroupGloo, ProcessGroupNCCL, and ProcessGroupMPI
其中的broadcast()在初始化时从rand0的进程开始分发模型的stateallreduce用来计算所有进程的梯度
接口解读
接口文档的笔记整理。
首先当然是最核心的DistributedDataParallel了
CLASStorch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False)
- batch_size应该比使用的GPU数量大
- 调用DistributedDataParallel前应先调用
torch.distributed.init_process_group() - 对于有N个GPU的显卡,应该创建N个进程,每个进程控制一张卡。
- 能使用的backends有
gloo和nccl。ncclbackend是目前最推荐也是最快的backend,单节点和多节点分布式训练中均可使用 - 这个模块也可以用来混合精度的分布式训练,模型可以有不同的参数精度。
- 如果用
torch.save保存了某个进程的模型,在另一个进程中用torch.load加载,需要使用map_location。 - 如果用nccl或gloo,需要使用一个多workers的DataLoader,需要将multiprocessing start method改为forkserver或spawn。
- 将模型用DDP接口调用以后,不应该再改变模型参数
接着介绍创建进程的multiprocessing.spawn, nprocs=1, join=True, daemon=False, start_method=’spawn’))接口
torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')
每个主机创建nprocs个进程去执行fn函数,fn函数的输入是args。
这里稍微注意一下fn函数,它的第一个参数应该是rank(int)类型,表示第几个进程。
举例说明:
mp.spawn(main_func,args=(n_gpus, ), # 只需要传入rank之后的其它参数nprocs=n_gpus,join=True)# 第一个参数rank表示是第几个进程def main_func(rank, n_gpus):dist.init_process_group("gloo", rank=rank, world_size=world_size)if rank = 0:# 可以执行一些只有第一个进程执行的操作
join=True表示执行的是阻塞连接,即操作函数会一直等候结果而不会立即返回。
daemon表示是否创建守护进程
如果join=True,这个接口会返回None
如果join是False,这个接口会返回ProcessContext
然后介绍通信的初始化接口torch.distributed.init_process_group
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')
- backend: nccl或gloo,gloo基本只支持cpu,选nccl即可
- init_method:指定通信的URL,如果未指定它和store,默认是”env://“
- world_size:所有参与训练的进程数
- rank:当前进程在所有进程中的编号
代码实战
通过前面内容的讲解,应该对torch.nn.parallel.DistributedDataParallel有了大致的了解,但还是不够。接下来会通过阅读一份优秀代码进一步加深理解。代码来自pytorch官方github库的示例,用来训练ImageNet的,本身要素比较多,我会只对保留其中和分布式训练有关的部分。
导入
没啥说的,导入需要的库
import argparseimport osimport randomimport shutilimport timeimport warningsimport torchimport torch.nn as nnimport torch.nn.parallelimport torch.backends.cudnn as cudnnimport torch.distributed as distimport torch.optimimport torch.multiprocessing as mpimport torch.utils.dataimport torch.utils.data.distributedimport torchvision.transforms as transformsimport torchvision.datasets as datasetsimport torchvision.models as models
创建进程
通常代码中的world_size指训练的进程数,是nproc_per_nodenode(每个节点的进程数节点数)。这里的节点数可理解为主机数,由于我的使用场景通常是单机多卡,因此node=1。
PS:对于多机多卡的操作场景我暂时无法理解,因此会标出来
这里确认了需不需要使用分布式训练
如果需要,确认进程数量
用torch.multiprocessing.spawn创建对应的进程,使用的参数包括:需要执行的函数,每个主机的进程数量,传进函数的参数
parser.add_argument('--world-size', default=-1, type=int,help='number of nodes for distributed training')args = parser.parse_args()if args.dist_url == "env://" and args.world_size == -1:args.world_size = int(os.environ["WORLD_SIZE"])args.distributed = args.world_size > 1 or args.multiprocessing_distributedif args.multiprocessing_distributed:# Since we have ngpus_per_node processes per node, the total world_size# needs to be adjusted accordingly# 更新后的world_size: 所有进程的数量args.world_size = ngpus_per_node * args.world_size# Use torch.multiprocessing.spawn to launch distributed processes: the# main_worker process function# nprocs:每个主机的进程数量mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
进程内添加训练代码
每个进程需要执行的程序,即上文中main_worker的具体内容。
main_worker的第一个参数是固定要保留的,表示当前主机使用的gpu号(或者当前主机的进程号),程序会自动填入,一般0会对应用的0卡,1对应用的1卡,诸如此类。
rank一开始表示节点数(主机数),后来表示当前进程在所有主机所有进程中的一个编号。
多进程训练需要使用dist.init_process_group实现通信。初始化时需指定一个用来通信的地址。如果是单机多卡,可以直接指定为localhost+无人使用的端口号。示例如下:
port_name = str(random.randint(10000, 99999))os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = port_name # '12355'# 之后就init_process_group即可,不需要指定init_method了
有个细节可以关注一下,作者设置的batch_size是所有GPU一起训练的时候一个批次的数量。在实际训练时,整体的batch_size被划分到各个显卡上了。workers同理。(如果使用distributedsampler,batchsize设置与单卡相同即可。)
还有一个细节,如果分布式训练,单机多卡时保存模型只保存0卡的模型。多机多卡时保留每个主机上0卡的模型
这里注意一下:每个进程都要加载一下数据集、确定criterion和优化函数。
parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str,help='url used to set up distributed training')parser.add_argument('--dist-backend', default='nccl', type=str,help='distributed backend')parser.add_argument('--rank', default=-1, type=int,help='node rank for distributed training')parser.add_argument('-b', '--batch-size', default=256, type=int,metavar='N',help='mini-batch size (default: 256), this is the total ''batch size of all GPUs on the current node when ''using Data Parallel or Distributed Data Parallel')parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',help='number of data loading workers (default: 4)')def main_worker(gpu, ngpus_per_node, args):# gpu表示当前主机中,本进程的编号,会和使用的卡对应,也可称之为本地序号if args.distributed:if args.multiprocessing_distributed:# rank表示所有进程中,本进程的编号args.rank = args.rank * ngpus_per_node + gpu# 用dist.init_process_group实现通信# 通信框架用的nccl,确认通信地址,所有需要通信的进程数量,本进信在所有进程中的编号dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size, rank=args.rank)# For multiprocessing distributed, DistributedDataParallel constructor# should always set the single device scope, otherwise,# DistributedDataParallel will use all available devices.if args.gpu is not None:# 根据当前主机的进程号指定当前主机的显卡torch.cuda.set_device(args.gpu)model.cuda(args.gpu)# 如果一个进程控制一个GPU,可以把整体batch_size划分到各个GPU中。workers同理。# When using a single GPU per process and per# DistributedDataParallel, we need to divide the batch size# ourselves based on the total number of GPUs we haveargs.batch_size = int(args.batch_size / ngpus_per_node)args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)# 把model放到对应的卡上model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])else:model.cuda()# DistributedDataParallel will divide and allocate batch_size to all# available GPUs if device_ids are not setmodel = torch.nn.parallel.DistributedDataParallel(model)# 接着是一些常规的网络训练设置# define loss function (criterion) and optimizercriterion = nn.CrossEntropyLoss().cuda(args.gpu)optimizer = torch.optim.SGD(model.parameters(), args.lr,momentum=args.momentum,weight_decay=args.weight_decay)# 确定数据集train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),num_workers=args.workers, pin_memory=True, sampler=train_sampler)# 训练for epoch in range(args.start_epoch, args.epochs):train(train_loader, model, criterion, optimizer, epoch, args)# 单机多卡时只保存0卡的模型# 多机多卡时,保留每个主机上0卡的模型if not args.multiprocessing_distributed or (args.multiprocessing_distributedand args.rank % ngpus_per_node == 0):save_checkpoint({'epoch': epoch + 1,'arch': args.arch,'state_dict': model.state_dict(),'best_acc1': best_acc1,'optimizer' : optimizer.state_dict(),}, is_best)
train函数的具体内容如下。
def train(train_loader, model, criterion, optimizer, epoch, args):for i, (images, target) in enumerate(train_loader):output = model(images)loss = criterion(output, target)optimizer.zero_grad()loss.backward()optimizer.step()
和nn.DataParallel的区别
DistributedDataParallel为每个GPU创建一个进程,而DataParallel使用多线程。每个GPU有专用的进程可以避免python解释器中GIL导致的性能开销
