RGModern icon indicating copy to clipboard operation
RGModern copied to clipboard

1.1.0 new core design

Open gxm11 opened this issue 1 year ago • 1 comments

新的RGM核心调度规则

20230717 运行流程设计

  1. engine包含多个scheduler,每个scheduler在独立的线程里运行。

  2. scheduler在独立线程中执行,它初始化后就会进入主循环。在没有worker的情况下,scheduler不会退出。

  • scheduler可以添加、删除worker,然后在主循环中调度这些worker。添加worker是即时的,但删除worker是异步的,worker在收到退出信号后,会自行退出。
  • worker的退出不影响scheduler,则任务必须以广播的形式发送。这样任务不会被拦截,单个worker的存在与否不影响其他的worker是否执行任务。
  • 与之前的设计不同,scheduler的数据结构里没有业务逻辑。只在添加了不同的worker后才会执行特定的业务逻辑。
  • 由于scheduler没有业务逻辑,所以广播的任务scheduler一定会收到,不过在主循环启动后会迅速被丢弃。
  1. scheduler的主循环分为以下步骤:
  • 检查scheduler和worker的状态,移除已经结束的worker
  • 检查任务队列queue,将queue中的任务发布给其他的worker
  • 依次执行worker的业务逻辑
  1. scheduler在多次检查自己的任务队列都为空时,就使用阻塞模式读取任务队列的内容,且等待至多1ms,以降低CPU消耗。
  • 这是整个scheduler的主循环里唯一的等待。
  1. scheduler对worker的调度有4种模式,区别在于worker是否在fiber中运行,以及是否定义run函数。简单地说,如果worker在fiber中运行,有独立的运行栈,则worker流程阻塞时,会切换到主要fiber;如果没有独立的运行栈,则阻塞worker的同时,会定期更新scheduler,并嵌套执行其他的worker。
分类 run_in_fiber 定义run函数 调度方式
1 Yes Yes 给worker创建新的fiber,执行run函数。
2 Yes No 给worker创建新的fiber,执行默认的run函数。此run函数会清空队列,在队列为空或获取数据失败时使用fiber_yield切回到主Fiber。
3 No Yes worker的run函数直接阻塞scheduler的运行。这仅适用于scheduler中只有1个worker的情况。
4 No No 在主Fiber中,worker会执行默认的run函数。此函数会清空队列,在队列为空或获取数据失败时就会提前结束。
  1. worker有自己的可执行任务列表,不在列表中的任务将不会分发给worker。worker有一个队列,其中的任务会从前到后依次执行,队列的实现是std::queue。

  2. worker的运行流程如下,注意,如果worker在fiber中执行,下面的每个都是在切换到fiber后执行的。

  • scheduler添加worker,创建基本的数据结构,随后的此操作都是将控制权交给worker执行的。
  • 初始化owner类型的数据
  • 执行数据的setup_owner函数,无需获取数据所有权,在这步函数执行完毕后,owner数据才可以被其他worker加载。
  • 初始化local类型的数据
  • 初始化group类型的数据
  • 初始化share类型的数据
  • 执行数据的setup函数,需要获取数据所有权
  • 执行run函数或者清空队列,在合适的时机将执行权交给scheduler
  • 进入结束状态后,会先解绑数据再退出
  • 解绑share类型的数据
  • 解绑group类型的数据
  • 析构local类型的数据
  • 析构owner类型的数据,需要获取数据所有权
  • 清空队列中的任务,但是不锁定数据,也不执行run的内容(在此处解除其他worker的等待)
  • 完全结束,将执行权交给scheduler,后者会在下次更新时删除此worker。

当worker获取的share或group数据的owner已经析构时,worker会进入退出状态。

  1. worker执行任务时,如果定义了run函数,则需要在run函数调用过程中,主动广播任务或者阻塞执行任务。如果未定义run函数,则会读取任务队列的首项并执行,然后移除此项,直到任务队列为空,或者由于其他原因交出执行权。
  • 在此worker执行任务,需要锁定数据,在锁定数据失败时阻塞worker的流程。
  • 发送任务到别的worker,任务会异步执行。
  • 发送任务到别的worker,并等待相应的任务结束,此时worker的流程阻塞。建议监控某个变量,如果变量从true变为false则认定任务结束。
  1. worker流程阻塞时,会交出执行权。如果worker在fiber中运行,会切换回主要fiber,执行scheduler的主循环;如果worker不在fiber中运行,scheduler仍然可以执行单步更新,分配任务并执行其他的worker的流程。
  • 这里不需要考虑两个worker互相等待对方的死锁问题,因为即使是多线程的模式,也无法解决这个死锁,这属于业务逻辑上的错误设计。
  • scheduler可以嵌套调用worker,方法是在worker阻塞时,调用scheduler的update方法。
  • 在必要的时候,worker也可以使用原子变量atomic_wait等待调度,这样会导致scheduler随着一起阻塞。
  1. 任务有自己的run函数和成员变量。run函数的参数是worker中数据的引用或者常量引用,worker在执行run函数之前会尝试锁定任务所需的数据,如果失败则阻塞流程,交出执行权,等待下次恢复运行时再次尝试锁定数据。run函数执行完毕后,解锁刚刚锁定的数据。
  • 任务必须是可复制的,每个任务都会广播给所有的scheduler。
  • 在任务中执行任务,则无需锁定全部的数据,通过模板参数告知worker哪些数据要锁定。
  • 专用于阻塞流程的任务,需要用std::function给出判断条件,比如捕获一个栈上的变量,每次worker运行到此处时检查判断条件是否返回false或者true,决定是否继续运行。

综上,需要实现以下类和函数:

1. engine
- broadcast_task<T_task>(const T_task& t)
- terminate()

2. scheduler
- create_worker<T_worker>()
- remove_worker(size_t worker_id)
- update()
- enqueue_task<T_task>(const T_task& t)
- dequeue_task()
- resume_worker()
- terminated()

3. worker
- lock_data<T_data ...>()
- unlock_data<T_data ...>()
- run_task<T_task>()
- yield()
- setup_owner()
- setup_data()
- bind_data()
- unbind_data()
- terminate()

4. task
- run(T1& t1, T2& t2, ...)

gxm11 avatar Jul 20 '23 08:07 gxm11