oneflow-documentation icon indicating copy to clipboard operation
oneflow-documentation copied to clipboard

教程文档:分布式专题

Open strint opened this issue 2 years ago • 3 comments

Maintainer List

@doombeaker @clackhan @strint @leaves-zwx

这些同事需要对本教程的正确性、专业性长期负责。

目的

发布分布式教程,以让开发者参考教程,可以自服务的完成分布式模型的搭建;

受众

所有用户和开发者;跟随0.8 周期完成;

稳定性

稳定性跟随特定Feature的稳定性;

完备性

1、Quick start @doombeaker 5分钟启动一个测试

  • [ ] 跑一个数据/模型混合并行;
  • [ ] 介绍启动方式;

2、1D 并行 @clackhan

  • [ ] 数据并行
  • [ ] 模型并行
  • [ ] 模型加载、保存

3、流水并行 @strint

  • [ ] 流水并行示例

4、2D 混合并行 + 流水并行 @leaves-zwx

  • [ ] 示例 + 推荐使用libai

测试

所有示例要求都可以执行

strint avatar Apr 25 '22 06:04 strint

2、1D 并行 @clackhan

Global tensor可以轻松支持任何并行性,包括数据并行性、模型并行性,可以跨多台机器运行。

注意: 本教程中的代码在 2-GPU 服务器上运行,但可以轻松推广到其他环境

  • [ ] 数据并行

  • 模型构建

在数据并行模式中,每个GPU上包含完整的模型参数,各张卡的参数完全相同,每个rank输入不同的数据。接下来使用Global 模式训练数据并行网络,第一步是创建模型,下面代码定义了一个包含两个全连接层的网络,并将其扩展到到两卡。

注意: 代码中单模型通过to_global扩展到两卡时,会将rank 0上模型的参数广播到其他rank上,故无需担心不同的进程上模型参数初始值不同。

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim
import time

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

      
placement = flow.placement(type="cuda", ranks=[0, 1])
sbp = flow.sbp.broadcast

model = ToyModel()
model.to_global(placement, sbp) # 将模型扩展到两卡
  • 模型训练

数据并行模型训练脚本与单机单卡无异

max_iter = 20

for i in range(max_iter):
    data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
    labels = flow.randn(20, 5, placement=placement, sbp=flow.sbp.split(0))
    loss = model(data, labels)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
  • 模型保存与加载

与local 模式相比,global模式在保存和加载模型时需要指定参数global_dst_rank,该参数用于配置在哪个rank执行模型持久化或加载模型过程。

flow.save(model.state_dict(), 'model_weights.pth', global_dst_rank=0)

model2 = ToyModel()
model2.to_global(placement, sbp)
model2.load_state_dict(flow.load('model_weights.pth', global_src_rank=0))

:warning:在global模式下,每个rank都需要执行flow.save(...)/flow.load(...)脚本,即不允许在if rank == 0:作用域中执行flow.save(...)/flow.load(...)

  • 完整代码(在网页中可以默认折叠)与启动方式

OneFlow 提供了 oneflow.distributed.launch (链接到distributed.launch教程/文档)模块帮助用户更方便地启动分布式训练。

用户可以借助以下的形式,启动分布式训练:

python3 -m oneflow.distributed.launch [启动选项] 训练脚本.py

执行以下命令启动单机两卡的数据并行训练:

python3 -m oneflow.distributed.launch --nproc_per_node 2 data_parallel.py
# data_parallel.py

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 20)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(20, 16)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

def demo_basic(max_iter, load_path=None, save_path=None):
    placement = flow.placement(type="cuda", ranks=[0, 1])
    sbp = flow.sbp.broadcast

    model = ToyModel()
    model.to_global(placement, sbp) # 将模型扩展到两卡
    
    if load_path is not None:
        model.load_state_dict(flow.load(load_path, global_src_rank=0))
    
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    
    for i in range(max_iter):
        data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
        labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.split(0))
        loss = model(data, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if save_path is not None:
        flow.save(model.state_dict(), save_path, global_dst_rank=0)

if __name__ == "__main__":
    demo_basic(10)
  • [ ] 模型并行

使用Global 模式可以很轻松的实现模型并行,整体过程与数据并行大体相同,区别在与定义模型时sbp的配置。与数据并行不同,模型并行的参数均匀分布在每个GPU上,相反每个rank上都有完整的数据。

注意: 调用to_global(...)接口执行local_to_global的转换时,当sbp中包含flow.sbp.split(x)时,会将各rank上的local tensor以x为dim拼接到一起,为了使得linear module的参数(in_features,out_features)具有和数据并行相同的语义,下面代码中定义的ModelParallelLinear

模型并行模式下,脚本启动方式和模型的加载/保存与数据并行相同

import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim

class ModelParallelLinear(nn.Module):
    def __init__(self, in_features: int, out_features: int, placement: flow.placement, bias: bool = True):
        super(ModelParallelLiner, self).__init__()
        assert out_features % placement.ranks.size == 0, "out_features must be divisible by parallel num"
        self.linear = nn.Linear(in_features, out_features // placement.ranks.size, bias) # 由于调用to_global将
        self.linear.to_global(placement, flow.sbp.split(0))

    def forward(self, x):
        return self.linear(x)

class ToyModel(nn.Module):
    def __init__(self, placement):
        super(ToyModel, self).__init__()
        self.net1 = ModelParallelLinear(10, 20, placement)
        self.relu = nn.ReLU()
        self.net2 = ModelParallelLinear(20, 16, placement)
        self.loss_fn = nn.MSELoss()

    def forward(self, x, labels):
        fc1_out = self.net1(x)
        relu1_out = self.relu(fc1_out)
        logits = self.net2(relu1_out)
        return self.loss_fn(logits, labels)

def demo_basic(max_iter, load_path=None, save_path=None):
    placement = flow.placement(type="cuda", ranks=[0, 1])

    model = ToyModel(placement)
    if load_path is not None:
        model.load_state_dict(flow.load(load_path, global_src_rank=0))
    
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    
    for i in range(max_iter):
        data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.broadcast)
        labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.broadcast)
        loss = model(data, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    if save_path is not None:
        flow.save(model.state_dict(), save_path, global_dst_rank=0)

if __name__ == "__main__":
    demo_basic(10)
  • [ ] ~~模型加载、保存~~

clackhan avatar Apr 26 '22 13:04 clackhan

第二篇:

使用 Global Tensor 进行多机多设备编程:数据并行、模型并行、流水并行

各一个可执行的示例 + 解释

参考:

  • https://github.com/Oneflow-Inc/oneflow-documentation/issues/481#issuecomment-1109771017
  • 数据并行和模型并行,https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html#_2
  • 流水并行,https://docs.oneflow.org/master/parallelism/06_pipeline.html

执行过程

  • 题目 + 提纲
  • 把例子跑起来,填充进去
  • 在补充内容

strint avatar Jul 27 '22 02:07 strint

第三篇:global tensor 后向

strint avatar Jul 27 '22 02:07 strint