oneflow-documentation
oneflow-documentation copied to clipboard
教程文档:分布式专题
Maintainer List
@doombeaker @clackhan @strint @leaves-zwx
这些同事需要对本教程的正确性、专业性长期负责。
目的
发布分布式教程,以让开发者参考教程,可以自服务的完成分布式模型的搭建;
受众
所有用户和开发者;跟随0.8 周期完成;
稳定性
稳定性跟随特定Feature的稳定性;
完备性
1、Quick start @doombeaker 5分钟启动一个测试
- [ ] 跑一个数据/模型混合并行;
- [ ] 介绍启动方式;
2、1D 并行 @clackhan
- [ ] 数据并行
- [ ] 模型并行
- [ ] 模型加载、保存
3、流水并行 @strint
- [ ] 流水并行示例
4、2D 混合并行 + 流水并行 @leaves-zwx
- [ ] 示例 + 推荐使用libai
测试
所有示例要求都可以执行
2、1D 并行 @clackhan
Global tensor可以轻松支持任何并行性,包括数据并行性、模型并行性,可以跨多台机器运行。
注意: 本教程中的代码在 2-GPU 服务器上运行,但可以轻松推广到其他环境
-
[ ] 数据并行
-
模型构建
在数据并行模式中,每个GPU上包含完整的模型参数,各张卡的参数完全相同,每个rank输入不同的数据。接下来使用Global 模式训练数据并行网络,第一步是创建模型,下面代码定义了一个包含两个全连接层的网络,并将其扩展到到两卡。
注意: 代码中单模型通过to_global扩展到两卡时,会将rank 0上模型的参数广播到其他rank上,故无需担心不同的进程上模型参数初始值不同。
import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim
import time
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
self.loss_fn = nn.MSELoss()
def forward(self, x, labels):
fc1_out = self.net1(x)
relu1_out = self.relu(fc1_out)
logits = self.net2(relu1_out)
return self.loss_fn(logits, labels)
placement = flow.placement(type="cuda", ranks=[0, 1])
sbp = flow.sbp.broadcast
model = ToyModel()
model.to_global(placement, sbp) # 将模型扩展到两卡
- 模型训练
数据并行模型训练脚本与单机单卡无异
max_iter = 20
for i in range(max_iter):
data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
labels = flow.randn(20, 5, placement=placement, sbp=flow.sbp.split(0))
loss = model(data, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
- 模型保存与加载
与local 模式相比,global模式在保存和加载模型时需要指定参数global_dst_rank,该参数用于配置在哪个rank执行模型持久化或加载模型过程。
flow.save(model.state_dict(), 'model_weights.pth', global_dst_rank=0)
model2 = ToyModel()
model2.to_global(placement, sbp)
model2.load_state_dict(flow.load('model_weights.pth', global_src_rank=0))
:warning:在global模式下,每个rank都需要执行flow.save(...)/flow.load(...)脚本,即不允许在if rank == 0:作用域中执行flow.save(...)/flow.load(...)
- 完整代码(在网页中可以默认折叠)与启动方式
OneFlow 提供了 oneflow.distributed.launch
(链接到distributed.launch教程/文档)模块帮助用户更方便地启动分布式训练。
用户可以借助以下的形式,启动分布式训练:
python3 -m oneflow.distributed.launch [启动选项] 训练脚本.py
执行以下命令启动单机两卡的数据并行训练:
python3 -m oneflow.distributed.launch --nproc_per_node 2 data_parallel.py
# data_parallel.py
import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 20)
self.relu = nn.ReLU()
self.net2 = nn.Linear(20, 16)
self.loss_fn = nn.MSELoss()
def forward(self, x, labels):
fc1_out = self.net1(x)
relu1_out = self.relu(fc1_out)
logits = self.net2(relu1_out)
return self.loss_fn(logits, labels)
def demo_basic(max_iter, load_path=None, save_path=None):
placement = flow.placement(type="cuda", ranks=[0, 1])
sbp = flow.sbp.broadcast
model = ToyModel()
model.to_global(placement, sbp) # 将模型扩展到两卡
if load_path is not None:
model.load_state_dict(flow.load(load_path, global_src_rank=0))
optimizer = optim.SGD(model.parameters(), lr=0.001)
for i in range(max_iter):
data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.split(0))
labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.split(0))
loss = model(data, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
if save_path is not None:
flow.save(model.state_dict(), save_path, global_dst_rank=0)
if __name__ == "__main__":
demo_basic(10)
- [ ] 模型并行
使用Global 模式可以很轻松的实现模型并行,整体过程与数据并行大体相同,区别在与定义模型时sbp的配置。与数据并行不同,模型并行的参数均匀分布在每个GPU上,相反每个rank上都有完整的数据。
注意: 调用
to_global(...)
接口执行local_to_global的转换时,当sbp中包含flow.sbp.split(x)
时,会将各rank上的local tensor以x为dim拼接到一起,为了使得linear module的参数(in_features,out_features)具有和数据并行相同的语义,下面代码中定义的ModelParallelLinear
。
模型并行模式下,脚本启动方式和模型的加载/保存与数据并行相同
import oneflow as flow
import oneflow.nn as nn
import oneflow.optim as optim
class ModelParallelLinear(nn.Module):
def __init__(self, in_features: int, out_features: int, placement: flow.placement, bias: bool = True):
super(ModelParallelLiner, self).__init__()
assert out_features % placement.ranks.size == 0, "out_features must be divisible by parallel num"
self.linear = nn.Linear(in_features, out_features // placement.ranks.size, bias) # 由于调用to_global将
self.linear.to_global(placement, flow.sbp.split(0))
def forward(self, x):
return self.linear(x)
class ToyModel(nn.Module):
def __init__(self, placement):
super(ToyModel, self).__init__()
self.net1 = ModelParallelLinear(10, 20, placement)
self.relu = nn.ReLU()
self.net2 = ModelParallelLinear(20, 16, placement)
self.loss_fn = nn.MSELoss()
def forward(self, x, labels):
fc1_out = self.net1(x)
relu1_out = self.relu(fc1_out)
logits = self.net2(relu1_out)
return self.loss_fn(logits, labels)
def demo_basic(max_iter, load_path=None, save_path=None):
placement = flow.placement(type="cuda", ranks=[0, 1])
model = ToyModel(placement)
if load_path is not None:
model.load_state_dict(flow.load(load_path, global_src_rank=0))
optimizer = optim.SGD(model.parameters(), lr=0.001)
for i in range(max_iter):
data = flow.randn(20, 10, placement=placement, sbp=flow.sbp.broadcast)
labels = flow.randn(20, 16, placement=placement, sbp=flow.sbp.broadcast)
loss = model(data, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
if save_path is not None:
flow.save(model.state_dict(), save_path, global_dst_rank=0)
if __name__ == "__main__":
demo_basic(10)
- [ ] ~~模型加载、保存~~
第二篇:
使用 Global Tensor 进行多机多设备编程:数据并行、模型并行、流水并行
各一个可执行的示例 + 解释
参考:
- https://github.com/Oneflow-Inc/oneflow-documentation/issues/481#issuecomment-1109771017
- 数据并行和模型并行,https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html#_2
- 流水并行,https://docs.oneflow.org/master/parallelism/06_pipeline.html
执行过程
- 题目 + 提纲
- 把例子跑起来,填充进去
- 在补充内容
第三篇:global tensor 后向