RGModern
RGModern copied to clipboard
1.1.0 new core design
新的RGM核心调度规则
20230717 运行流程设计
-
engine包含多个scheduler,每个scheduler在独立的线程里运行。
-
scheduler在独立线程中执行,它初始化后就会进入主循环。在没有worker的情况下,scheduler不会退出。
- scheduler可以添加、删除worker,然后在主循环中调度这些worker。添加worker是即时的,但删除worker是异步的,worker在收到退出信号后,会自行退出。
- worker的退出不影响scheduler,则任务必须以广播的形式发送。这样任务不会被拦截,单个worker的存在与否不影响其他的worker是否执行任务。
- 与之前的设计不同,scheduler的数据结构里没有业务逻辑。只在添加了不同的worker后才会执行特定的业务逻辑。
- 由于scheduler没有业务逻辑,所以广播的任务scheduler一定会收到,不过在主循环启动后会迅速被丢弃。
- scheduler的主循环分为以下步骤:
- 检查scheduler和worker的状态,移除已经结束的worker
- 检查任务队列queue,将queue中的任务发布给其他的worker
- 依次执行worker的业务逻辑
- scheduler在多次检查自己的任务队列都为空时,就使用阻塞模式读取任务队列的内容,且等待至多1ms,以降低CPU消耗。
- 这是整个scheduler的主循环里唯一的等待。
- scheduler对worker的调度有4种模式,区别在于worker是否在fiber中运行,以及是否定义run函数。简单地说,如果worker在fiber中运行,有独立的运行栈,则worker流程阻塞时,会切换到主要fiber;如果没有独立的运行栈,则阻塞worker的同时,会定期更新scheduler,并嵌套执行其他的worker。
分类 | run_in_fiber | 定义run函数 | 调度方式 |
---|---|---|---|
1 | Yes | Yes | 给worker创建新的fiber,执行run函数。 |
2 | Yes | No | 给worker创建新的fiber,执行默认的run函数。此run函数会清空队列,在队列为空或获取数据失败时使用fiber_yield切回到主Fiber。 |
3 | No | Yes | worker的run函数直接阻塞scheduler的运行。这仅适用于scheduler中只有1个worker的情况。 |
4 | No | No | 在主Fiber中,worker会执行默认的run函数。此函数会清空队列,在队列为空或获取数据失败时就会提前结束。 |
-
worker有自己的可执行任务列表,不在列表中的任务将不会分发给worker。worker有一个队列,其中的任务会从前到后依次执行,队列的实现是std::queue。
-
worker的运行流程如下,注意,如果worker在fiber中执行,下面的每个都是在切换到fiber后执行的。
- scheduler添加worker,创建基本的数据结构,随后的此操作都是将控制权交给worker执行的。
- 初始化owner类型的数据
- 执行数据的setup_owner函数,无需获取数据所有权,在这步函数执行完毕后,owner数据才可以被其他worker加载。
- 初始化local类型的数据
- 初始化group类型的数据
- 初始化share类型的数据
- 执行数据的setup函数,需要获取数据所有权
- 执行run函数或者清空队列,在合适的时机将执行权交给scheduler
- 进入结束状态后,会先解绑数据再退出
- 解绑share类型的数据
- 解绑group类型的数据
- 析构local类型的数据
- 析构owner类型的数据,需要获取数据所有权
- 清空队列中的任务,但是不锁定数据,也不执行run的内容(在此处解除其他worker的等待)
- 完全结束,将执行权交给scheduler,后者会在下次更新时删除此worker。
当worker获取的share或group数据的owner已经析构时,worker会进入退出状态。
- worker执行任务时,如果定义了run函数,则需要在run函数调用过程中,主动广播任务或者阻塞执行任务。如果未定义run函数,则会读取任务队列的首项并执行,然后移除此项,直到任务队列为空,或者由于其他原因交出执行权。
- 在此worker执行任务,需要锁定数据,在锁定数据失败时阻塞worker的流程。
- 发送任务到别的worker,任务会异步执行。
- 发送任务到别的worker,并等待相应的任务结束,此时worker的流程阻塞。建议监控某个变量,如果变量从true变为false则认定任务结束。
- worker流程阻塞时,会交出执行权。如果worker在fiber中运行,会切换回主要fiber,执行scheduler的主循环;如果worker不在fiber中运行,scheduler仍然可以执行单步更新,分配任务并执行其他的worker的流程。
- 这里不需要考虑两个worker互相等待对方的死锁问题,因为即使是多线程的模式,也无法解决这个死锁,这属于业务逻辑上的错误设计。
- scheduler可以嵌套调用worker,方法是在worker阻塞时,调用scheduler的update方法。
- 在必要的时候,worker也可以使用原子变量atomic_wait等待调度,这样会导致scheduler随着一起阻塞。
- 任务有自己的run函数和成员变量。run函数的参数是worker中数据的引用或者常量引用,worker在执行run函数之前会尝试锁定任务所需的数据,如果失败则阻塞流程,交出执行权,等待下次恢复运行时再次尝试锁定数据。run函数执行完毕后,解锁刚刚锁定的数据。
- 任务必须是可复制的,每个任务都会广播给所有的scheduler。
- 在任务中执行任务,则无需锁定全部的数据,通过模板参数告知worker哪些数据要锁定。
- 专用于阻塞流程的任务,需要用std::function给出判断条件,比如捕获一个栈上的变量,每次worker运行到此处时检查判断条件是否返回false或者true,决定是否继续运行。
综上,需要实现以下类和函数:
1. engine
- broadcast_task<T_task>(const T_task& t)
- terminate()
2. scheduler
- create_worker<T_worker>()
- remove_worker(size_t worker_id)
- update()
- enqueue_task<T_task>(const T_task& t)
- dequeue_task()
- resume_worker()
- terminated()
3. worker
- lock_data<T_data ...>()
- unlock_data<T_data ...>()
- run_task<T_task>()
- yield()
- setup_owner()
- setup_data()
- bind_data()
- unbind_data()
- terminate()
4. task
- run(T1& t1, T2& t2, ...)