oneflow
oneflow copied to clipboard
flow.tmp_compute_stream
提供作用域flow.tmp_compute_stream,其下的所有op都在拥有单独线程的tmp_compute stream上工作。
和后江,建浩开会结论:
- python接口不叫flow.tmp_compute_stream_type(),而是直接对齐pytorch的torch.cuda.Stream和torch.cuda.stream。
- 一条Stream一个线程。
将flow.tmp_compute_stream重构为flow.stream(flow.Stream())。 与之前讨论的稍微有点不一样,flow.Stream并不独占某个线程,而是回环地使用固定数量的worker线程池中的一个。
我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream
我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream
首先,上层用户的Python代码可能是这样的:
class Foo():
def __init__(self):
self.stream = flow.Stream()
def forward(self):
with flow.stream(self.stream):
...
显然self.stream被持有了,某个线程就会一直存在。这样的代码并不是低频的,尤其是库文件里,于是就会大量创造worker线程,引发各种问题。
其次,cpu stream 所在的线程需要初始化一个thread_global_id,这个目前很受限,最多只能有8个,main线程,scheduler线程和default worker线程已经占了3个。最后只剩下5个了线程允许初始化不同的thread_global_id。如果允许任意多个worker 线程,运行global op的时候肯定会崩溃的。
我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream
首先,上层用户的Python代码可能是这样的:
class Foo(): def __init__(self): self.stream = flow.Stream() def forward(self): with flow.stream(self.stream): ...
显然self.stream被持有了,某个线程就会一直存在。这样的代码并不是低频的,尤其是库文件里,于是就会大量创造worker线程,引发各种问题。
其次,cpu stream 所在的线程需要初始化一个thread_global_id,这个目前很受限,最多只能有8个,main线程,scheduler线程和default worker线程已经占了3个。最后只剩下5个了线程允许初始化不同的thread_global_id。如果允许任意多个worker 线程,运行global op的时候肯定会崩溃的。
其实我们的库文件里肯定不应该频繁的创建新的stream,而且还一直持有不释放,如果很多地方都有stream的需求,那是不是应该提供一个全局的类似于default stream供库文件里使用。 另外“最多只能有8个”这个应该是一个默认值,甚至默认值可以再小一些,但可以让用户指定。
与后江,建浩会议结论:
- Stream创建接口修改为:flow.Stream(worker_thread=None),若worker_thread为None,内部谁定为默认的临时线程。
- worker_thread不能随意创建,只能通过flow.get_worker_thread(worker_thread_id)获取,而且有限额,大概是4个。
- python生命周期不重叠的flow.Stream可以复用底层c++的Symbol<Stream>。
- 同一个线程内,不同flow.Stream对象都会共享同一个nccl stream。
这是为了解决如下几个经典断流问题:
print(loss)断流问题。
问题描述:
loss = model()
loss.backward()
foo(loss)
print(loss)
因为foo函数所产生的op的实际执行排在backward op之后。所有打印loss时需要等待backward和foo对应op的完成,造成断流。 可以使用flow.Stream将如下代码重构为:
loss = model()
loss.backward()
with flow.stream(flow.Stream()):
foo(loss)
print(loss)
由于loss已经在tmp stream上,而foo和loss.numpy都会安排在tmp stream上工作,所以print(loss)不需要等待.backward完成就可以返回,于是就能提前准备后续op。
if nan断流问题
在clip_grad函数里,原本的代码有可能造成断流:
...
if error_if_nonfinite and flow.logical_or(
total_norm.isnan(), total_norm.isinf()
):
raise RuntimeError(
f"The total norm of order {norm_type} for gradients from "
"`parameters` is non-finite, so it cannot be clipped. To disable "
"this error and scale the gradients by the non-finite norm anyway, "
"set `error_if_nonfinite=False`"
)
clip_coef = max_norm / (total_norm + 1e-6)
clip_coef_clamped = clip_coef.clamp(max=1.0)
for p in parameters:
p.grad.detach().mul_(clip_coef_clamped.to_global(placement=p.placement))
...
由于total_norm在if判断之后的代码都不会被修改,我们可以先把total_norm的值拷贝到tmp stream上,然后再执行后续op,最后在对tmp stream上的total_norm做if判断处理。
...
# 先拷贝到tmp stream
with flow.stream(flow.Stream()):
tmp_total_norm = total_norm.clone()
# 提前调度后续op
clip_coef = max_norm / (total_norm + 1e-6)
clip_coef_clamped = clip_coef.clamp(max=1.0)
for p in parameters:
p.grad.detach().mul_(clip_coef_clamped.to_global(placement=p.placement))
# 在tmp stream上同步worker线程的数据到main线程
with flow.stream(flow.Stream()):
if error_if_nonfinite and flow.logical_or(
tmp_total_norm.isnan(), tmp_total_norm.isinf()
):
raise RuntimeError(
f"The total norm of order {norm_type} for gradients from "
"`parameters` is non-finite, so it cannot be clipped. To disable "
"this error and scale the gradients by the non-finite norm anyway, "
"set `error_if_nonfinite=False`"
)
...
decoder和loss相关的nccl kernel顺序化导致的断流问题。
场景:
input = GetMicroBatch()
global_input = input.to_global(...)
partial_loss = nn_graph(global_input)
loss = partial_loss.to_gbloal(flow.sbp.broadcast)
这里的input和loss都在nccl上有操作,于是就会造成顺序化,使得中间的nn_graph没法流式的执行起来。可以用flow.Stream配合flow.get_worker_thread解决这里的断流问题:
input = GetMicroBatch()
with flow.stream(flow.Stream(flow.get_worker_thread(1))):
global_input = input.to_global(...)
partial_loss = nn_graph(global_input)
with flow.stream(flow.Stream(flow.get_worker_thread(2))):
loss = partial_loss.to_gbloal(flow.sbp.broadcast)
应该不需要再考虑stream内顺序化机制是否要重构。 具体场景:
with flow.stream(flow.Stream()):
Foo()
with flow.stream(flow.Stream()):
Bar()
由于flow.Stream的复用机制,实际上Foo和Bar op会顺序执行。这应该不需要再重构,如果不复用的话会有内存暴涨问题,比如说:
for i in range(999999):
with flow.stream(flow.Stream()):
Foo()
for循环内的Foo每次iter都不会和上一次iter顺序化,这就完全可能造成内存暴涨。
另外,我们不应该鼓励flow.stream(flow.Stream())这种写法,而应该是类似python一样持有flow.Stream的做法。
def MyModule(nn.Module):
def __init__(self):
...
self.stream = flow.Stream()
def forward(self):
with flow.stream(s):
Foo()
针对 https://github.com/Oneflow-Inc/oneflow/pull/8866#discussion_r955574611 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- stream这个词在用户看来会联想到他需要主动做相关的操作,比如synchronize,但其实oneflow内建支持了同步,不需要用户额外做工作。
- 如果oneflow后续对齐了oneflow.cuda.Stream,这会给用户带来更多的困惑:oneflow.Stream和oneflow.cuda.Stream有什么差别。
- 应该把flow.stream重命名成flow.pipeline,而把flow.Stream重命名为flow.Pipeline,去掉flow.worker_thread,让flow.Pipeline接收worker_thread_id做参数。
- 将c++层的StreamSet重命名成Pipeline,然后导出到python层。
- flow.Pipeline和flow.device是同一层级的概念。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
针对 #8866 (comment) 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?
Thread,Device,StreamType这三者唯一确定一个vm::Stream对象
现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。
如果我们现在导出flow.stream,那以后为了对齐pytorch,肯定也还要导出flow.cuda.stream,这两个stream就很难向用户解释清楚了。
肯定不能叫 Stream,因为其与 torch.cuda.Stream 不是同一个东西,但名字相近就会多出来向用户解释的成本。
读完 https://github.com/Oneflow-Inc/OneTeam/issues/1632 这个 issue,还无法完全领会这个功能所起的作用,解决什么问题,以及为什么要这么解决(这么解决的好处或者必要性)。
现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。
这里没有理解。流水线有一种“把任务按照固定方向往后传递“的感觉。
如果我们有和 PyTorch 对齐的 flow.cuda.Stream,并且在它构造函数里增加一个参数 auto_sync,是不是就可以用来表达我们 c++ 层的 stream 了,那时再把本 PR 的接口以 flow.StreamSet 的名字来导出是不是就可以
所以建议现在把本 PR 接口先放在 experimental 命名空间里,等我们有和 PyTorch 对齐的 flow.cuda.Stream 之后,再改成
with flow.stream_set(flow.StreamSet(auto_sync=True/False)):
pass
那时还可以支持这样的写法:
new_stream_set = flow.StreamSet(auto_sync=False)
with flow.stream_set(new_stream_set):
...
s = new_stream_set.compute_stream(device='cuda') # s 的类型是 flow.cuda.Stream
s.synchronize()
这里没有理解。流水线有一种“把任务按照固定方向往后传递“的感觉。
嗯嗯,是指令乱序,指令发射出去了在流水线内部就是固定方向传递的。
针对 #8866 (comment) 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?
Thread,Device,StreamType这三者唯一确定一个vm::Stream对象
_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。
针对 #8866 (comment) 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?
Thread,Device,StreamType这三者唯一确定一个vm::Stream对象
_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。
针对 #8866 (comment) 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread? Thread,Device,StreamType这三者唯一确定一个vm::Stream对象
_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。
我是从 "2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。" 这里改过来的。
Thead 对应 worker thread,它负责和Device交互;
Device 是物理设备,它可以带有多个 Stream;
StreamType 含义比较接近任务类型;
- 一个指令(任务)
- 有StreamType(任务类型)
- 可以选择一个 Thead (工作线程,可以和多个 Stream交互)【本PR的新增功能】
- 从工作线程向某个设备的某个Stream下发任务
看是不是这样一个逻辑
读完 Oneflow-Inc/OneTeam#1632 这个 issue,还无法完全领会这个功能所起的作用,解决什么问题,以及为什么要这么解决(这么解决的好处或者必要性)。
把vm底层的vm::Stream导出到python层,直接提供给用户一些抓手优化性能。
现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。
如果我们现在导出flow.stream,那以后为了对齐pytorch,肯定也还要导出flow.cuda.stream,这两个stream就很难向用户解释清楚了。
与Pipe Module差得还是挺多的,它主要解决多个卡之间的流水。
If the module requires lots of memory and doesn't fit on a single GPU, pipeline parallelism is a useful technique to employ for training.
auto_sync=False
auto_sync的参数是不是可以去掉,其实这个不需要用户操心。
auto_sync=False
auto_sync的参数是不是可以去掉,其实这个不需要用户操心。
为什么呢,我理解有了这个参数我们才能让我们目前的 stream 和未来兼容 PyTorch 的 stream 在同一个概念下暴露给用户
针对 #8866 (comment) 这个问题。与后江开会讨论结果:
- python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
- PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。
好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread? Thread,Device,StreamType这三者唯一确定一个vm::Stream对象
_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。
我是从 "2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。" 这里改过来的。
Thead 对应 worker thread,它负责和Device交互;
Device 是物理设备,它可以带有多个 Stream;
StreamType 含义比较接近任务类型;
- 一个指令(任务)
- 有StreamType(任务类型)
- 可以选择一个 Thead (工作线程,可以和多个 Stream交互)【本PR的新增功能】
- 从工作线程向某个设备的某个Stream下发任务
看是不是这样一个逻辑
是的
auto_sync=False
auto_sync的参数是不是可以去掉,其实这个不需要用户操心。
为什么呢,我理解有了这个参数我们才能让我们目前的 stream 和未来兼容 PyTorch 的 stream 在同一个概念下暴露给用户
首先,torch.Stream并没有显式的auto_sync参数,而是一个笼统的kwargs。其次,如果将来真的有stream_set.compute_stream()的需求,到时再扩充flow.StreamSet,同样加一个kwargs就可以了。
首先,torch.Stream并没有显式的auto_sync参数,而是一个笼统的kwargs。其次,如果将来真的有stream_set.compute_stream()的需求,到时再扩充flow.StreamSet,同样加一个kwargs就可以了。
不过为了和 torch.cuda.Stream 语义对齐,我们的这个 auto_sync 默认值应该是 False?也就是用户需要显式指定 auto_sync=True,才是我们现在的自动在 stream 间同步的语义,否则默认就是和 PyTorch 语义对齐的需手动同步。
以及为了防止用户疑惑,可以先把 StreamSet 放在 experimental 命名空间里。
也就是 现在的命名:flow.experimental.StreamSet() 将来的命名:flow.StreamSet(auto_sync=True)
看我们的理解是不是一样
experimental
试验性的工具类 torch 一般是放到 utils.xxx 下面,稳定后再升级到二级命名空间下、或者一级命名空间下。
之前的实验性功能命名空间 experimental 也都已经删掉了
experimental
试验性的工具类 torch 一般是放到 utils.xxx 下面,稳定后再升级到二级命名空间下、或者一级命名空间下。
之前的实验性功能命名空间 experimental 也都已经删掉了
我没有找到过torch.experimental.xxx api。最多只是找到了functorch.experimental.xxx和torchtext.experimental.xxx。后两者看起来是基于pytorch框架做得库,不是torch本身的东西。 或者,在其子名字空间下还可以找到experimental https://pytorch.org/docs/stable/search.html?q=torch.experimental&check_keywords=yes&area=default#
CI failed when running job: Build cpu. PR label automerge has been removed
备选方案以及辩护理由
新奇、建浩提 flow.StreamSet / flow.stream_set
啸宇:不应该直接放在oneflow顶层名字空间 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
新奇、建浩提 flow.experimental.StreamSet / flow.experimental.stream_set
啸宇:torch的不太成熟的api一般都放在torch.utils名字空间下。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
新奇提 flow.vm.StreamSet / flow.vm.stream_set
新奇:不太想把vm这个概念暴露出来,本质上不应该让用户关心。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
新奇提 flow.worker.StreamSet / flow.worker.stream_set
新奇:用户不太能理解worker语义。 啸宇:子命名空间应该是特定功能,worker有点不合适。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
新奇提 flow.worker_thread.StreamSet / flow.worker_thread.stream_set
新奇:worker_thread里用户的业务逻辑太远。 啸宇:子命名空间应该是特定功能,worker_thread有点不合适。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
啸宇提 flow.stream.StreamSet / flow.stream.stream_set
后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
新奇,啸宇提flow.utils.StreamSet / flow.utils.stream_set
后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。
啸宇,后江,建浩提新的名字空间 flow.async
均无异议。新奇:看起来像是flow.vm的替代,但是更易于用户理解。 畅想了什么api可以放到该名字空间下:flow.async.local_sync ,flow.async.global_sync,
后江提flow.async.run
with flow.async.run(thread_global_id):
pass
啸宇:run这个字眼不太合适,应该是个名词
新奇提flow.async.run(flow.async.Fiber(thread_global_id))
后江:Fiber比stream好,但仍然会让人联想到常见的协程概念,但此处又不太一致。
新奇、后江提flow.async.pipeline
新奇,啸宇:pipeline在pytorch里有类似概念: Pipe module。那个更多的是一个composer,把分属两个设备的module组合起来,类似Sequential module。此处的flow.async.pipeline更多地想表达类似micro thread的概念。
后江提flow.asyncs.thread(thread_global_id)
后江:暂时不要提供fiber概念,用户不易理解。c++层面的StreamSet可以继续存在,python层只导出flow.asyncs.thread。用户用不同fiber所要解决的问题,总是可以用不同thread_global_id来解决,而且更加可靠,因为不同的thread_global_id肯定不会出现不同fiber争用线程资源。
default_thread_id = 0
decoder_async_thread_id = 1
loss_async_thread_id = 2
for i in epoch_iters:
with flow.asyncs.thread(decoder_async_thread_id):
data = get_data()
loss = train(model)
with flow.asyncs.thread(loss_async_thread_id):
write_metric(loss)
决策原则
- api是否明确暴露eager 运行时的核心特性。
- 让用户最快地理解api如何使用。
Speed stats: