pivot-lang
pivot-lang copied to clipboard
协程支持
- [x] generator @Chronostasys #320
- [ ] #433
- [x] #430
- [x] async main 支持
目标实现无栈协程,这需要编译器能自动将异步函数编译为异步状态机
考虑下方函数:
async fn test() {
let a = 1;
await some_async_io();
a = a + 1;
}
在编译的时候,test会变成:
struct test_ctx{
step:i64;
a: i64;
}
fn test_init() test_ctx {
return test_ctx{};
}
fn test_move_next(ctx: test_ctx) bool {
switch ctx.step {
case 0:
ctx.a = 1;
event_loop_register(some_async_io);
case 1:
ctx.a = ctx.a + 1;
}
ctx.step += 1;
return ctx.step == 1;
}
在调用test的时候,生成代码为:
let ctx = test_init();
test_move_next(ctx);
event_loop_register会把异步操作注册到evloop里,在该操作完成之后,evloop会通知调度器,调度器继续执行test_move_next(ctx)
为了保证gc正确性,evloop线程不要跑pl代码或者函数,只跑rust代码。
关于调度器
我们把异步状态机记为Task,那么一个纯异步程序中(main也是async的),实际上所有线程都不是直接跑用户代码,而是运行调度器,而调度器对用户代码进行调度和运行。
下方假设我们使用一个全局的task 队列,那么调度器伪代码如下:
loop {
let task = queue.dequeue();
if task != null {
task.move_next();
}
}
evloop在一个异步操作完成后,需要queue.enqueue(task)
// 处理之后
#[allow(unused)]
fn async_func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> impl Future<Output = () >{
enum AsyncFunc {
State0((Arc<Mutex<Box<Reactor>>>, Instant)),
State1((Task, Instant)),
Done,
}
impl Future for AsyncFunc {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match *self {
Self::State0((ref mut reactor, start)) => {
let val = Task::new(reactor.clone(), 3, 1);
*self = Self::State1((val, start));
},
Self::State1((ref mut task, start)) => {
match Pin::new(task).poll(cx) {
Poll::Ready(val) => {
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
*self = Self::Done;
return Poll::Ready(());
},
Poll::Pending => {
return Poll::Pending;
},
}
},
AsyncFunc::Done => todo!(),
}
}
}
}
AsyncFunc::State0((reactor, start))
}
// 处理之前
#[allow(unused)]
async fn func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> () {
let val = Task::new(reactor.clone(), 3, 1).await;
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
}
fn main() {
let start = Instant::now();
let reactor = Reactor::new();
let fut1 = func(reactor.clone(), start);
// let fut1 = async_func(reactor.clone(), start);
let fut2 = async {
let val = Task::new(reactor.clone(), 1, 2).await;
println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
};
let mainfut = async {
fut1.await;
fut2.await;
};
block_on(mainfut);
reactor.lock().map(|mut r| r.close()).unwrap();
}
use std::{
future::Future, sync::{ mpsc::{channel, Sender}, Arc, Mutex, Condvar},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem, pin::Pin,
thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
// ============================= Executor ====================================
#[derive(Default)]
struct Parker(Mutex<bool>, Condvar);
impl Parker {
fn park(&self) {
let mut resumable = self.0.lock().unwrap();
while !*resumable {
resumable = self.1.wait(resumable).unwrap();
}
*resumable = false;
}
fn unpark(&self) {
*self.0.lock().unwrap() = true;
self.1.notify_one();
}
}
fn block_on<F: Future>(mut future: F) -> F::Output {
let parker = Arc::new(Parker::default());
let mywaker = Arc::new(MyWaker { parker: parker.clone() });
let waker = mywaker_into_waker(Arc::into_raw(mywaker));
let mut cx = Context::from_waker(&waker);
// SAFETY: we shadow `future` so it can't be accessed again.
let mut future = unsafe { Pin::new_unchecked(&mut future) };
loop {
match Future::poll(future.as_mut(), &mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => parker.park(),
};
}
}
// ====================== Future Impl==============================
#[derive(Clone)]
struct MyWaker {
parker: Arc<Parker>,
}
#[derive(Clone)]
pub struct Task {
id: usize,
reactor: Arc<Mutex<Box<Reactor>>>,
data: u64,
}
fn mywaker_wake(s: &MyWaker) {
let waker_arc = unsafe { Arc::from_raw(s) };
waker_arc.parker.unpark();
}
fn mywaker_clone(s: &MyWaker) -> RawWaker {
let arc = unsafe { Arc::from_raw(s) };
std::mem::forget(arc.clone()); // increase ref count
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}
const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| mywaker_clone(&*(s as *const MyWaker)),
|s| mywaker_wake(&*(s as *const MyWaker)),
|s| mywaker_wake(*(s as *const &MyWaker)),
|s| drop(Arc::from_raw(s as *const MyWaker)),
)
};
fn mywaker_into_waker(s: *const MyWaker) -> Waker {
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
impl Task {
fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
Task { id, reactor, data }
}
}
impl Future for Task {
type Output = usize;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut r = self.reactor.lock().unwrap();
if r.is_ready(self.id) {
*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
Poll::Ready(self.id)
} else if r.tasks.contains_key(&self.id) {
r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
Poll::Pending
} else {
r.register(self.data, cx.waker().clone(), self.id);
Poll::Pending
}
}
}
// =============================== Reactor ===================================
enum TaskState {
Ready,
NotReady(Waker),
Finished,
}
struct Reactor {
dispatcher: Sender<Event>,
handle: Option<JoinHandle<()>>,
tasks: HashMap<usize, TaskState>,
}
#[derive(Debug)]
enum Event {
Close,
Timeout(u64, usize),
}
impl Reactor {
fn new() -> Arc<Mutex<Box<Self>>> {
let (tx, rx) = channel::<Event>();
let reactor = Arc::new(Mutex::new(Box::new(Reactor {
dispatcher: tx,
handle: None,
tasks: HashMap::new(),
})));
let reactor_clone = Arc::downgrade(&reactor);
let handle = thread::spawn(move || {
let mut handles = vec![];
for event in rx {
let reactor = reactor_clone.clone();
match event {
Event::Close => break,
Event::Timeout(duration, id) => {
let event_handle = thread::spawn(move || {
thread::sleep(Duration::from_secs(duration));
let reactor = reactor.upgrade().unwrap();
reactor.lock().map(|mut r| r.wake(id)).unwrap();
});
handles.push(event_handle);
}
}
}
handles.into_iter().for_each(|handle| handle.join().unwrap());
});
reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
reactor
}
fn wake(&mut self, id: usize) {
let state = self.tasks.get_mut(&id).unwrap();
match mem::replace(state, TaskState::Ready) {
TaskState::NotReady(waker) => waker.wake(),
TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
_ => unreachable!()
}
}
fn register(&mut self, duration: u64, waker: Waker, id: usize) {
if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
panic!("Tried to insert a task with id: '{}', twice!", id);
}
self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
}
fn close(&mut self) {
self.dispatcher.send(Event::Close).unwrap();
}
fn is_ready(&self, id: usize) -> bool {
self.tasks.get(&id).map(|state| match state {
TaskState::Ready => true,
_ => false,
}).unwrap_or(false)
}
}
impl Drop for Reactor {
fn drop(&mut self) {
self.handle.take().map(|h| h.join().unwrap()).unwrap();
}
}
Async to Iterator translate:
async fn test_async() Future<i64> {
let a = await fn_async();
return a;
}
desugar
gen fn test_async_iter() Iterator<Poll<i64>> {
let statemachine = fn_async();
while statemachine.poll() is Pending {
let re: Poll<i64> = Pending{};
yield return re;
}
let result = statemachine.poll() as Ready<i64>!;
let re: Poll<i64> = result;
while true {
yield return re
}
}
struct DefaultFuture<T> {
v:Iterator<Poll<i64>> ;
}
impl <T> Future<T> for DefaultFuture<T> {
fn poll() Poll<T> {
return self.v.next() as Poll<T>;
}
}
fn test_async() Future<i64> {
let re = test_async_iter();
return DefaultFuture<i64> {v:re};
}
Async
和rust不一样,我们的只要跑了async函数就会开始运行
async fn task1:
ret await task2
async fn task2:
await task3
ret task4
let t = await task1
await task5(t)
async fn task5(t):
await t
async fn task4:
pass
graph TD
A[Task1] -->|await| B[Task2]
B -->|await| C[Task3]
B --> |start| D[Task4]
E[Task5] -->|await| D
execution flow
graph TD
Executor -->|poll with waker Task1| A[Task1]
A -->|poll with waker Task1| B[Task2]
B -->|poll with waker Task1| C[Task3]
C -.-> |wake| A
Executor --> |poll with waker Task4| D[Task4]
Executor -->|poll with waker Task5| E[Task5]
E -->|poll with waker Task5| D
D -.-> |wake| E
Lowering example:
Lowerring
async fn main() Future<i64> {
let a = await task1();
return await task2(a);
}
fn main() Future<i64> {
let ctx = MainGeneratorCtx{};
return new_future(|wk|=>ctx.poll(wk));
}
struct Future<T> {
poll_f:|wk:*Waker|=>Poll<T>;
result: Option<T>;
}
impl <T> Future<T> {
fn poll(wk:*Waker) Poll<T> {
if self.result is T {
return Ready{v: self.result as T!};
} else {
let p = (self.poll_f)(wk);
if p is Ready<T> {
self.result = Some{(p as Ready<T>!).v};
self.poll_f = |wk|=>Ready{v: self.result as T!};
}
return p;
}
}
}
fn new_future<T>(poll_f:|wk:*Waker|=>Poll<T>) Future<T> {
return Future{
poll_f: poll_f, result: None{}
};
}
struct MainGeneratorCtx {
state: i32;
a_f: Future<i64>;
b_f: Future<i64>;
a: i64;
b: i64;
}
impl MainGeneratorCtx {
fn poll(wk:*Waker) Poll<i64> {
while true {
if self.state == 0 {
self.a_f = task1();
self.state = 1;
return Pending{};
} else if self.state == 1 {
let p = self.a_f.poll(wk);
if p is Pending {
return Pending{};
} else {
self.a = (p as Ready<i64>!).v;
self.b_f = task2(self.a);
self.state = 2;
return Pending{};
}
} else if self.state == 2 {
let p = self.b_f.poll(wk);
if p is Pending {
return Pending{};
} else {
self.b = (p as Ready<i64>!).v;
self.state = 3;
return Pending{};
}
} else{
return Ready{v: self.b};
}
}
}
}
use std::chan;
pub struct Task {
poll: |Waker|=>void;
}
pub struct Waker {
wake: |*chan::Chan<Task>|=>void;
ch: *chan::Chan<Task>;
}
pub struct Pending {}
pub struct Ready<T> {
v: T;
}
pub type Poll<T> = Ready<T> | Pending;
pub trait Future<T> {
fn poll(wk:Waker) Poll<T>;
}
pub struct FnFuture<T> {
poll: |Waker|=>Poll<T>;
status: Poll<T>;
}
impl <T> Future<T> for FnFuture<T> {
fn poll(wk: Waker) Poll<T> {
if self.status is Ready<T> {
return self.status;
} else {
return self.poll(wk);
}
}
}
impl <T> FnFuture<T> {
fn then<U>(f: |T|=>U) FnFuture<U> {
return FnFuture<U> {
poll: |wk|=>{
if self.status is Ready<T> {
let v = (self.status as Ready<T>!).v;
return Ready<U>{v: f(v)} as Poll<U>;
} else {
let p = self.poll(wk);
if p is Ready<T> {
let v = (p as Ready<T>!).v;
return Ready<U>{v: f(v)} as Poll<U>;
} else {
return Pending{} as Poll<U>;
}
}
},
status: Pending{} as Poll<U>,
};
}
}
///==========main==========///
fn main() void {
let exe = executor(10 as u64);
let d = delay(5 as u64) as Future<()>;
let ff = FnFuture<()> {
poll: |wk|=>{
return d.poll(wk);
},
status: Pending{} as Poll<()>,
};
let f = ff.then<i64>(|_a|=>i64{
return 42;
});
exe.spawn<i64>(f as Future<i64>);
exe.run();
println!("done!");
return;
}
///==========Delay============///
pub struct Delay {
when: u64;
}
pub fn delay(secs: u64) Delay {
return Delay {
when: unixtime() + secs as u64
};
}
impl Future<()> for Delay {
fn poll(wk:Waker) Poll<()> {
if unixtime() >= self.when {
return Ready<()>{};
} else {
spawn(||=>{
let rem = self.when - unixtime();
if rem > 0 {
sleep(rem);
}
wk.wake(wk.ch);
return;
});
return Pending{};
}
}
}
fn unixtime() u64;
///=========thread============///
fn new_thread(f: *||=>void) void;
pub fn spawn(f:||=>void) void {
new_thread(&f);
return;
}
pub fn sleep(secs: u64) void;
///==========executor=========///
pub struct Executor {
ch: *chan::Chan<Task>;
}
pub fn executor(n: u64) Executor {
return Executor {
ch: &chan::channel<Task>(n)
};
}
impl Executor {
fn run() void {
while true {
let task = self.ch.recv();
let waker = Waker {
ch: self.ch,
wake: |ch|=>void {
ch.send(task);
return;
}
};
task.poll(waker);
}
return;
}
fn spawn<T>(f: Future<T>) void {
let task = Task {
poll: |wk|=>{
f.poll(wk);
return;
},
};
self.ch.send(task);
return;
}
}