oneflow icon indicating copy to clipboard operation
oneflow copied to clipboard

flow.tmp_compute_stream

Open lixinqi opened this issue 2 years ago • 1 comments

提供作用域flow.tmp_compute_stream,其下的所有op都在拥有单独线程的tmp_compute stream上工作。

lixinqi avatar Aug 08 '22 02:08 lixinqi

和后江,建浩开会结论:

  1. python接口不叫flow.tmp_compute_stream_type(),而是直接对齐pytorch的torch.cuda.Stream和torch.cuda.stream。
  2. 一条Stream一个线程。

lixinqi avatar Aug 11 '22 02:08 lixinqi

将flow.tmp_compute_stream重构为flow.stream(flow.Stream())。 与之前讨论的稍微有点不一样,flow.Stream并不独占某个线程,而是回环地使用固定数量的worker线程池中的一个。

lixinqi avatar Aug 11 '22 11:08 lixinqi

我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream

hjchen2 avatar Aug 11 '22 13:08 hjchen2

我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream

首先,上层用户的Python代码可能是这样的:

class Foo():
    def __init__(self):
        self.stream = flow.Stream()
    def forward(self):
       with flow.stream(self.stream):
           ...

显然self.stream被持有了,某个线程就会一直存在。这样的代码并不是低频的,尤其是库文件里,于是就会大量创造worker线程,引发各种问题。

其次,cpu stream 所在的线程需要初始化一个thread_global_id,这个目前很受限,最多只能有8个,main线程,scheduler线程和default worker线程已经占了3个。最后只剩下5个了线程允许初始化不同的thread_global_id。如果允许任意多个worker 线程,运行global op的时候肯定会崩溃的。

lixinqi avatar Aug 11 '22 16:08 lixinqi

我仍然觉得一个stream一个线程比较合适,回环地使用固定数量的worker线程池中的一个倒不如约定用户最多可以同时创建N个Stream

首先,上层用户的Python代码可能是这样的:

class Foo():
    def __init__(self):
        self.stream = flow.Stream()
    def forward(self):
       with flow.stream(self.stream):
           ...

显然self.stream被持有了,某个线程就会一直存在。这样的代码并不是低频的,尤其是库文件里,于是就会大量创造worker线程,引发各种问题。

其次,cpu stream 所在的线程需要初始化一个thread_global_id,这个目前很受限,最多只能有8个,main线程,scheduler线程和default worker线程已经占了3个。最后只剩下5个了线程允许初始化不同的thread_global_id。如果允许任意多个worker 线程,运行global op的时候肯定会崩溃的。

其实我们的库文件里肯定不应该频繁的创建新的stream,而且还一直持有不释放,如果很多地方都有stream的需求,那是不是应该提供一个全局的类似于default stream供库文件里使用。 另外“最多只能有8个”这个应该是一个默认值,甚至默认值可以再小一些,但可以让用户指定。

hjchen2 avatar Aug 12 '22 02:08 hjchen2

与后江,建浩会议结论:

  1. Stream创建接口修改为:flow.Stream(worker_thread=None),若worker_thread为None,内部谁定为默认的临时线程。
  2. worker_thread不能随意创建,只能通过flow.get_worker_thread(worker_thread_id)获取,而且有限额,大概是4个。
  3. python生命周期不重叠的flow.Stream可以复用底层c++的Symbol<Stream>。
  4. 同一个线程内,不同flow.Stream对象都会共享同一个nccl stream。

这是为了解决如下几个经典断流问题:

print(loss)断流问题。

问题描述:

loss = model()
loss.backward()
foo(loss)
print(loss)

因为foo函数所产生的op的实际执行排在backward op之后。所有打印loss时需要等待backward和foo对应op的完成,造成断流。 可以使用flow.Stream将如下代码重构为:

loss = model()
loss.backward()
with flow.stream(flow.Stream()):
    foo(loss)
    print(loss)

由于loss已经在tmp stream上,而foo和loss.numpy都会安排在tmp stream上工作,所以print(loss)不需要等待.backward完成就可以返回,于是就能提前准备后续op。

if nan断流问题

在clip_grad函数里,原本的代码有可能造成断流:

        ...

        if error_if_nonfinite and flow.logical_or(
            total_norm.isnan(), total_norm.isinf()
        ):
            raise RuntimeError(
                f"The total norm of order {norm_type} for gradients from "
                "`parameters` is non-finite, so it cannot be clipped. To disable "
                "this error and scale the gradients by the non-finite norm anyway, "
                "set `error_if_nonfinite=False`"
            )
        clip_coef = max_norm / (total_norm + 1e-6)
        clip_coef_clamped = clip_coef.clamp(max=1.0)
        for p in parameters:
            p.grad.detach().mul_(clip_coef_clamped.to_global(placement=p.placement))

        ...

由于total_norm在if判断之后的代码都不会被修改,我们可以先把total_norm的值拷贝到tmp stream上,然后再执行后续op,最后在对tmp stream上的total_norm做if判断处理。

        ...
       
        # 先拷贝到tmp stream  
        with flow.stream(flow.Stream()):
            tmp_total_norm = total_norm.clone()

        # 提前调度后续op
        clip_coef = max_norm / (total_norm + 1e-6)
        clip_coef_clamped = clip_coef.clamp(max=1.0)
        for p in parameters:
            p.grad.detach().mul_(clip_coef_clamped.to_global(placement=p.placement))
         
        # 在tmp stream上同步worker线程的数据到main线程
        with flow.stream(flow.Stream()): 
            if error_if_nonfinite and flow.logical_or(
                tmp_total_norm.isnan(), tmp_total_norm.isinf()
            ):
                raise RuntimeError(
                    f"The total norm of order {norm_type} for gradients from "
                    "`parameters` is non-finite, so it cannot be clipped. To disable "
                    "this error and scale the gradients by the non-finite norm anyway, "
                    "set `error_if_nonfinite=False`"
                )
        ...

decoder和loss相关的nccl kernel顺序化导致的断流问题。

场景:

input = GetMicroBatch()
global_input = input.to_global(...)
partial_loss = nn_graph(global_input)
loss = partial_loss.to_gbloal(flow.sbp.broadcast)

这里的input和loss都在nccl上有操作,于是就会造成顺序化,使得中间的nn_graph没法流式的执行起来。可以用flow.Stream配合flow.get_worker_thread解决这里的断流问题:

input = GetMicroBatch()
with flow.stream(flow.Stream(flow.get_worker_thread(1))):
    global_input = input.to_global(...)
partial_loss = nn_graph(global_input)
with flow.stream(flow.Stream(flow.get_worker_thread(2))):
    loss = partial_loss.to_gbloal(flow.sbp.broadcast)

lixinqi avatar Aug 12 '22 04:08 lixinqi

应该不需要再考虑stream内顺序化机制是否要重构。 具体场景:

with flow.stream(flow.Stream()):
    Foo()
with flow.stream(flow.Stream()):
    Bar()

由于flow.Stream的复用机制,实际上Foo和Bar op会顺序执行。这应该不需要再重构,如果不复用的话会有内存暴涨问题,比如说:

for i in range(999999):
    with flow.stream(flow.Stream()):
        Foo()

for循环内的Foo每次iter都不会和上一次iter顺序化,这就完全可能造成内存暴涨。

另外,我们不应该鼓励flow.stream(flow.Stream())这种写法,而应该是类似python一样持有flow.Stream的做法。

def MyModule(nn.Module):
    def __init__(self):
        ...
        self.stream = flow.Stream()
    def forward(self):
        with flow.stream(s):
            Foo()

lixinqi avatar Aug 12 '22 05:08 lixinqi

针对 https://github.com/Oneflow-Inc/oneflow/pull/8866#discussion_r955574611 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. stream这个词在用户看来会联想到他需要主动做相关的操作,比如synchronize,但其实oneflow内建支持了同步,不需要用户额外做工作。
  3. 如果oneflow后续对齐了oneflow.cuda.Stream,这会给用户带来更多的困惑:oneflow.Stream和oneflow.cuda.Stream有什么差别。
  4. 应该把flow.stream重命名成flow.pipeline,而把flow.Stream重命名为flow.Pipeline,去掉flow.worker_thread,让flow.Pipeline接收worker_thread_id做参数。
  5. 将c++层的StreamSet重命名成Pipeline,然后导出到python层。
  6. flow.Pipeline和flow.device是同一层级的概念。
  7. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

lixinqi avatar Aug 26 '22 04:08 lixinqi

针对 #8866 (comment) 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?

Thread,Device,StreamType这三者唯一确定一个vm::Stream对象

strint avatar Aug 26 '22 04:08 strint

现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。

如果我们现在导出flow.stream,那以后为了对齐pytorch,肯定也还要导出flow.cuda.stream,这两个stream就很难向用户解释清楚了。

hjchen2 avatar Aug 26 '22 04:08 hjchen2

肯定不能叫 Stream,因为其与 torch.cuda.Stream 不是同一个东西,但名字相近就会多出来向用户解释的成本。

leaves-zwx avatar Aug 26 '22 04:08 leaves-zwx

读完 https://github.com/Oneflow-Inc/OneTeam/issues/1632 这个 issue,还无法完全领会这个功能所起的作用,解决什么问题,以及为什么要这么解决(这么解决的好处或者必要性)。

leaves-zwx avatar Aug 26 '22 04:08 leaves-zwx

现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。

这里没有理解。流水线有一种“把任务按照固定方向往后传递“的感觉。


如果我们有和 PyTorch 对齐的 flow.cuda.Stream,并且在它构造函数里增加一个参数 auto_sync,是不是就可以用来表达我们 c++ 层的 stream 了,那时再把本 PR 的接口以 flow.StreamSet 的名字来导出是不是就可以

所以建议现在把本 PR 接口先放在 experimental 命名空间里,等我们有和 PyTorch 对齐的 flow.cuda.Stream 之后,再改成

with flow.stream_set(flow.StreamSet(auto_sync=True/False)):
  pass

那时还可以支持这样的写法:

new_stream_set = flow.StreamSet(auto_sync=False)
with flow.stream_set(new_stream_set):
  ...
s = new_stream_set.compute_stream(device='cuda') # s 的类型是 flow.cuda.Stream
s.synchronize()

daquexian avatar Aug 26 '22 06:08 daquexian

这里没有理解。流水线有一种“把任务按照固定方向往后传递“的感觉。

嗯嗯,是指令乱序,指令发射出去了在流水线内部就是固定方向传递的。

hjchen2 avatar Aug 26 '22 06:08 hjchen2

针对 #8866 (comment) 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?

Thread,Device,StreamType这三者唯一确定一个vm::Stream对象

_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。

lixinqi avatar Aug 26 '22 10:08 lixinqi

针对 #8866 (comment) 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread?

Thread,Device,StreamType这三者唯一确定一个vm::Stream对象

_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。

lixinqi avatar Aug 26 '22 10:08 lixinqi

针对 #8866 (comment) 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread? Thread,Device,StreamType这三者唯一确定一个vm::Stream对象

_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。

我是从 "2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。" 这里改过来的。

Thead 对应 worker thread,它负责和Device交互;

Device 是物理设备,它可以带有多个 Stream;

StreamType 含义比较接近任务类型;

  • 一个指令(任务)
  • 有StreamType(任务类型)
  • 可以选择一个 Thead (工作线程,可以和多个 Stream交互)【本PR的新增功能】
  • 从工作线程向某个设备的某个Stream下发任务

看是不是这样一个逻辑

strint avatar Aug 26 '22 10:08 strint

读完 Oneflow-Inc/OneTeam#1632 这个 issue,还无法完全领会这个功能所起的作用,解决什么问题,以及为什么要这么解决(这么解决的好处或者必要性)。

把vm底层的vm::Stream导出到python层,直接提供给用户一些抓手优化性能。

lixinqi avatar Aug 26 '22 10:08 lixinqi

现在我们Python这个Stream的作用其实和PyTorch的Pipe更贴近(https://pytorch.org/docs/stable/_modules/torch/distributed/pipeline/sync/pipe.html#Pipe ),oneflow C++ stream是更底层的东西,它就类似于流水线里的工人,可以有多个,高级的流水线还能让指令在这些stream上乱序执行。

如果我们现在导出flow.stream,那以后为了对齐pytorch,肯定也还要导出flow.cuda.stream,这两个stream就很难向用户解释清楚了。

与Pipe Module差得还是挺多的,它主要解决多个卡之间的流水。

 If the module requires lots of memory and doesn't fit on a single GPU, pipeline parallelism is a useful technique to employ for training.

lixinqi avatar Aug 26 '22 10:08 lixinqi

auto_sync=False

auto_sync的参数是不是可以去掉,其实这个不需要用户操心。

lixinqi avatar Aug 26 '22 10:08 lixinqi

auto_sync=False

auto_sync的参数是不是可以去掉,其实这个不需要用户操心。

为什么呢,我理解有了这个参数我们才能让我们目前的 stream 和未来兼容 PyTorch 的 stream 在同一个概念下暴露给用户

daquexian avatar Aug 26 '22 10:08 daquexian

针对 #8866 (comment) 这个问题。与后江开会讨论结果:

  1. python层的flow.Stream名不副实。它其实在导出flow.StreamSet概念,即这是一个跨StreamType/DeviceType的概念,与一般用户理解的stream从属于device这一常识相去甚远。
  2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。

好像和通常理解的 PipeLine (流水线) 也关联不大,更接近 c++ std::thread 的抽象,flow.Thread? Thread,Device,StreamType这三者唯一确定一个vm::Stream对象

_Thread,Device,StreamType这三者之下应该允许存在多个Stream对象。

我是从 "2. PipeLine,Device,StreamType这三者唯一确定一个vm::Stream对象。" 这里改过来的。

Thead 对应 worker thread,它负责和Device交互;

Device 是物理设备,它可以带有多个 Stream;

StreamType 含义比较接近任务类型;

  • 一个指令(任务)
  • 有StreamType(任务类型)
  • 可以选择一个 Thead (工作线程,可以和多个 Stream交互)【本PR的新增功能】
  • 从工作线程向某个设备的某个Stream下发任务

看是不是这样一个逻辑

是的

lixinqi avatar Aug 26 '22 10:08 lixinqi

auto_sync=False

auto_sync的参数是不是可以去掉,其实这个不需要用户操心。

为什么呢,我理解有了这个参数我们才能让我们目前的 stream 和未来兼容 PyTorch 的 stream 在同一个概念下暴露给用户

首先,torch.Stream并没有显式的auto_sync参数,而是一个笼统的kwargs。其次,如果将来真的有stream_set.compute_stream()的需求,到时再扩充flow.StreamSet,同样加一个kwargs就可以了。

lixinqi avatar Aug 26 '22 11:08 lixinqi

首先,torch.Stream并没有显式的auto_sync参数,而是一个笼统的kwargs。其次,如果将来真的有stream_set.compute_stream()的需求,到时再扩充flow.StreamSet,同样加一个kwargs就可以了。

不过为了和 torch.cuda.Stream 语义对齐,我们的这个 auto_sync 默认值应该是 False?也就是用户需要显式指定 auto_sync=True,才是我们现在的自动在 stream 间同步的语义,否则默认就是和 PyTorch 语义对齐的需手动同步。

以及为了防止用户疑惑,可以先把 StreamSet 放在 experimental 命名空间里。

也就是 现在的命名:flow.experimental.StreamSet() 将来的命名:flow.StreamSet(auto_sync=True)

看我们的理解是不是一样

daquexian avatar Aug 26 '22 13:08 daquexian

experimental

试验性的工具类 torch 一般是放到 utils.xxx 下面,稳定后再升级到二级命名空间下、或者一级命名空间下。

之前的实验性功能命名空间 experimental 也都已经删掉了

strint avatar Aug 26 '22 13:08 strint

experimental

试验性的工具类 torch 一般是放到 utils.xxx 下面,稳定后再升级到二级命名空间下、或者一级命名空间下。

之前的实验性功能命名空间 experimental 也都已经删掉了

我没有找到过torch.experimental.xxx api。最多只是找到了functorch.experimental.xxx和torchtext.experimental.xxx。后两者看起来是基于pytorch框架做得库,不是torch本身的东西。 或者,在其子名字空间下还可以找到experimental https://pytorch.org/docs/stable/search.html?q=torch.experimental&check_keywords=yes&area=default#

lixinqi avatar Aug 29 '22 02:08 lixinqi

CI failed when running job: Build cpu. PR label automerge has been removed

github-actions[bot] avatar Aug 29 '22 08:08 github-actions[bot]

备选方案以及辩护理由

新奇、建浩提 flow.StreamSet / flow.stream_set

啸宇:不应该直接放在oneflow顶层名字空间 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

新奇、建浩提 flow.experimental.StreamSet / flow.experimental.stream_set

啸宇:torch的不太成熟的api一般都放在torch.utils名字空间下。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

新奇提 flow.vm.StreamSet / flow.vm.stream_set

新奇:不太想把vm这个概念暴露出来,本质上不应该让用户关心。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

新奇提 flow.worker.StreamSet / flow.worker.stream_set

新奇:用户不太能理解worker语义。 啸宇:子命名空间应该是特定功能,worker有点不合适。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

新奇提 flow.worker_thread.StreamSet / flow.worker_thread.stream_set

新奇:worker_thread里用户的业务逻辑太远。 啸宇:子命名空间应该是特定功能,worker_thread有点不合适。 后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

啸宇提 flow.stream.StreamSet / flow.stream.stream_set

后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

新奇,啸宇提flow.utils.StreamSet / flow.utils.stream_set

后江:Stream这个字眼不合适,普通c++的开发人员容易理解成io流,而cuda开发人员容易理解为stream,但其实都不太像。

啸宇,后江,建浩提新的名字空间 flow.async

均无异议。新奇:看起来像是flow.vm的替代,但是更易于用户理解。 畅想了什么api可以放到该名字空间下:flow.async.local_sync ,flow.async.global_sync,

后江提flow.async.run

with flow.async.run(thread_global_id):
    pass

啸宇:run这个字眼不太合适,应该是个名词

新奇提flow.async.run(flow.async.Fiber(thread_global_id))

后江:Fiber比stream好,但仍然会让人联想到常见的协程概念,但此处又不太一致。

新奇、后江提flow.async.pipeline

新奇,啸宇:pipeline在pytorch里有类似概念: Pipe module。那个更多的是一个composer,把分属两个设备的module组合起来,类似Sequential module。此处的flow.async.pipeline更多地想表达类似micro thread的概念。

后江提flow.asyncs.thread(thread_global_id)

后江:暂时不要提供fiber概念,用户不易理解。c++层面的StreamSet可以继续存在,python层只导出flow.asyncs.thread。用户用不同fiber所要解决的问题,总是可以用不同thread_global_id来解决,而且更加可靠,因为不同的thread_global_id肯定不会出现不同fiber争用线程资源。


default_thread_id = 0
decoder_async_thread_id = 1
loss_async_thread_id = 2

for i in epoch_iters:

    with flow.asyncs.thread(decoder_async_thread_id):
        data = get_data()

    loss = train(model)

    with flow.asyncs.thread(loss_async_thread_id):
        write_metric(loss)

lixinqi avatar Aug 30 '22 02:08 lixinqi

决策原则

  1. api是否明确暴露eager 运行时的核心特性。
  2. 让用户最快地理解api如何使用。

lixinqi avatar Aug 30 '22 02:08 lixinqi

Speed stats:

github-actions[bot] avatar Aug 30 '22 05:08 github-actions[bot]