pivot-lang icon indicating copy to clipboard operation
pivot-lang copied to clipboard

协程支持

Open Chronostasys opened this issue 2 years ago • 5 comments

  • [x] generator @Chronostasys #320
  • [ ] #433
  • [x] #430
  • [x] async main 支持

目标实现无栈协程,这需要编译器能自动将异步函数编译为异步状态机

考虑下方函数:

async fn test() {
  let a = 1;
  await some_async_io();
  a = a + 1;
}

在编译的时候,test会变成:

struct test_ctx{
    step:i64;
    a: i64;
}

fn test_init() test_ctx {
  return test_ctx{};
}

fn test_move_next(ctx: test_ctx) bool {
  switch ctx.step {
    case 0:
      ctx.a = 1;
      event_loop_register(some_async_io);
    case 1:
      ctx.a = ctx.a + 1;
  }
  ctx.step += 1;
  return ctx.step == 1;
}

在调用test的时候,生成代码为:

let ctx = test_init();
test_move_next(ctx);

event_loop_register会把异步操作注册到evloop里,在该操作完成之后,evloop会通知调度器,调度器继续执行test_move_next(ctx)

为了保证gc正确性,evloop线程不要跑pl代码或者函数,只跑rust代码。

关于调度器

我们把异步状态机记为Task,那么一个纯异步程序中(main也是async的),实际上所有线程都不是直接跑用户代码,而是运行调度器,而调度器对用户代码进行调度和运行。

下方假设我们使用一个全局的task 队列,那么调度器伪代码如下:

loop {
  let task = queue.dequeue();
  if task != null {
    task.move_next();
  }
}

evloop在一个异步操作完成后,需要queue.enqueue(task)

Chronostasys avatar May 03 '23 06:05 Chronostasys

// 处理之后
#[allow(unused)]
fn async_func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> impl Future<Output = () >{
    enum AsyncFunc {
        State0((Arc<Mutex<Box<Reactor>>>, Instant)),
        State1((Task, Instant)),
        Done,
    }
    impl Future for AsyncFunc {
        type Output = ();
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            loop {
                match *self {
                    Self::State0((ref mut reactor, start)) => {
                        let val = Task::new(reactor.clone(), 3, 1);
                        *self = Self::State1((val, start));
                    },
                    Self::State1((ref mut task, start)) => {
                        match Pin::new(task).poll(cx) {
                            Poll::Ready(val) => {
                                println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
                                *self = Self::Done;
                                return Poll::Ready(());
                            },
                            Poll::Pending => {
                                return Poll::Pending;
                            },
                            
                        }
                    },
                    AsyncFunc::Done => todo!(),
                }
            }
        }
    }
    AsyncFunc::State0((reactor, start))
}
// 处理之前
#[allow(unused)]
async fn func(reactor: Arc<Mutex<Box<Reactor>>>, start: Instant) -> () {
    let val = Task::new(reactor.clone(), 3, 1).await;
    println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
}

fn main() {
    let start = Instant::now();
    let reactor = Reactor::new();

    let fut1 = func(reactor.clone(), start);
    // let fut1 = async_func(reactor.clone(), start);

    let fut2 = async {
        let val = Task::new(reactor.clone(), 1, 2).await;
        println!("Got {} at time: {:.2}.", val, start.elapsed().as_secs_f32());
    };

    let mainfut = async {
        fut1.await;
        fut2.await;
    };

    block_on(mainfut);
    reactor.lock().map(|mut r| r.close()).unwrap();
}

use std::{
    future::Future, sync::{ mpsc::{channel, Sender}, Arc, Mutex, Condvar},
    task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, mem, pin::Pin,
    thread::{self, JoinHandle}, time::{Duration, Instant}, collections::HashMap
};
// ============================= Executor ====================================
#[derive(Default)]
struct Parker(Mutex<bool>, Condvar);

impl Parker {
    fn park(&self) {
        let mut resumable = self.0.lock().unwrap();
            while !*resumable {
                resumable = self.1.wait(resumable).unwrap();
            }
        *resumable = false;
    }

    fn unpark(&self) {
        *self.0.lock().unwrap() = true;
        self.1.notify_one();
    }
}

fn block_on<F: Future>(mut future: F) -> F::Output {
    let parker = Arc::new(Parker::default());
    let mywaker = Arc::new(MyWaker { parker: parker.clone() });
    let waker = mywaker_into_waker(Arc::into_raw(mywaker));
    let mut cx = Context::from_waker(&waker);
    
    // SAFETY: we shadow `future` so it can't be accessed again.
    let mut future = unsafe { Pin::new_unchecked(&mut future) }; 
    loop {
        match Future::poll(future.as_mut(), &mut cx) {
            Poll::Ready(val) => break val,
            Poll::Pending => parker.park(),
        };
    }
}
// ====================== Future Impl==============================
#[derive(Clone)]
struct MyWaker {
    parker: Arc<Parker>,
}

#[derive(Clone)]
pub struct Task {
    id: usize,
    reactor: Arc<Mutex<Box<Reactor>>>,
    data: u64,
}

fn mywaker_wake(s: &MyWaker) {
    let waker_arc = unsafe { Arc::from_raw(s) };
    waker_arc.parker.unpark();
}

fn mywaker_clone(s: &MyWaker) -> RawWaker {
    let arc = unsafe { Arc::from_raw(s) };
    std::mem::forget(arc.clone()); // increase ref count
    RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}

const VTABLE: RawWakerVTable = unsafe {
    RawWakerVTable::new(
        |s| mywaker_clone(&*(s as *const MyWaker)),
        |s| mywaker_wake(&*(s as *const MyWaker)),
        |s| mywaker_wake(*(s as *const &MyWaker)),
        |s| drop(Arc::from_raw(s as *const MyWaker)),
    )
};

fn mywaker_into_waker(s: *const MyWaker) -> Waker {
    let raw_waker = RawWaker::new(s as *const (), &VTABLE);
    unsafe { Waker::from_raw(raw_waker) }
}

impl Task {
    fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {
        Task { id, reactor, data }
    }
}

impl Future for Task {
    type Output = usize;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut r = self.reactor.lock().unwrap();
        if r.is_ready(self.id) {
            *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
            Poll::Ready(self.id)
        } else if r.tasks.contains_key(&self.id) {
            r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));
            Poll::Pending
        } else {
            r.register(self.data, cx.waker().clone(), self.id);
            Poll::Pending
        }
    }
}
// =============================== Reactor ===================================
enum TaskState {
    Ready,
    NotReady(Waker),
    Finished,
}
struct Reactor {
    dispatcher: Sender<Event>,
    handle: Option<JoinHandle<()>>,
    tasks: HashMap<usize, TaskState>,
}

#[derive(Debug)]
enum Event {
    Close,
    Timeout(u64, usize),
}

impl Reactor {
    fn new() -> Arc<Mutex<Box<Self>>> {
        let (tx, rx) = channel::<Event>();
        let reactor = Arc::new(Mutex::new(Box::new(Reactor {
            dispatcher: tx,
            handle: None,
            tasks: HashMap::new(),
        })));
        
        let reactor_clone = Arc::downgrade(&reactor);
        let handle = thread::spawn(move || {
            let mut handles = vec![];
            for event in rx {
                let reactor = reactor_clone.clone();
                match event {
                    Event::Close => break,
                    Event::Timeout(duration, id) => {
                        let event_handle = thread::spawn(move || {
                            thread::sleep(Duration::from_secs(duration));
                            let reactor = reactor.upgrade().unwrap();
                            reactor.lock().map(|mut r| r.wake(id)).unwrap();
                        });
                        handles.push(event_handle);
                    }
                }
            }
            handles.into_iter().for_each(|handle| handle.join().unwrap());
        });
        reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();
        reactor
    }

    fn wake(&mut self, id: usize) {
        let state = self.tasks.get_mut(&id).unwrap();
        match mem::replace(state, TaskState::Ready) {
            TaskState::NotReady(waker) => waker.wake(),
            TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),
            _ => unreachable!()
        }
    }

    fn register(&mut self, duration: u64, waker: Waker, id: usize) {
        if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {
            panic!("Tried to insert a task with id: '{}', twice!", id);
        }
        self.dispatcher.send(Event::Timeout(duration, id)).unwrap();
    }

    fn close(&mut self) {
        self.dispatcher.send(Event::Close).unwrap();
    }

    fn is_ready(&self, id: usize) -> bool {
        self.tasks.get(&id).map(|state| match state {
            TaskState::Ready => true,
            _ => false,
        }).unwrap_or(false)
    }
}

impl Drop for Reactor {
    fn drop(&mut self) {
        self.handle.take().map(|h| h.join().unwrap()).unwrap();
    }
}

CjiW avatar May 30 '23 11:05 CjiW

Async to Iterator translate:

async fn test_async() Future<i64> {
    let a = await fn_async();
    return a;
}

desugar

gen fn test_async_iter() Iterator<Poll<i64>> {
    let statemachine = fn_async();
    while statemachine.poll() is Pending {
        let re: Poll<i64> = Pending{};
        yield return re;
    }
    let result = statemachine.poll() as Ready<i64>!;
    let re: Poll<i64> = result;
    while true {
        yield return re
    }
}

struct DefaultFuture<T> {
    v:Iterator<Poll<i64>> ;
}

impl <T> Future<T> for DefaultFuture<T> {
    fn poll() Poll<T> {
        return self.v.next() as Poll<T>;
    }
}

fn test_async() Future<i64> {
    let re = test_async_iter();
    return DefaultFuture<i64> {v:re};
}

Chronostasys avatar Dec 28 '23 10:12 Chronostasys

Async

和rust不一样,我们的只要跑了async函数就会开始运行

async fn task1:
    ret await task2

async fn task2:
    await task3
    ret task4

let t = await task1
await task5(t)

async fn task5(t):
    await t

async fn task4:
    pass
graph TD
    A[Task1] -->|await| B[Task2]
    B -->|await| C[Task3]
    B --> |start| D[Task4]
    E[Task5] -->|await| D

execution flow

graph TD
    Executor -->|poll with waker Task1| A[Task1]
    A -->|poll with waker Task1| B[Task2]
    B -->|poll with waker Task1| C[Task3]
    C -.-> |wake| A
    Executor --> |poll with waker Task4| D[Task4]
    Executor -->|poll with waker Task5| E[Task5]
    E -->|poll with waker Task5| D
    D -.-> |wake| E

Chronostasys avatar Mar 01 '24 12:03 Chronostasys

Lowering example:

Lowerring

async fn main() Future<i64> {
    let a = await task1();
    return await task2(a);
}
fn main() Future<i64> {
    let ctx = MainGeneratorCtx{};
    return new_future(|wk|=>ctx.poll(wk));
}


struct Future<T> {
    poll_f:|wk:*Waker|=>Poll<T>;
    result: Option<T>;
}

impl <T> Future<T> {
    fn poll(wk:*Waker) Poll<T> {
        if self.result is T {
            return Ready{v: self.result as T!};
        } else {
            let p = (self.poll_f)(wk);
            if p is Ready<T> {
                self.result = Some{(p as Ready<T>!).v};
                self.poll_f = |wk|=>Ready{v: self.result as T!};
            }
            return p;
        }
    }
}

fn new_future<T>(poll_f:|wk:*Waker|=>Poll<T>) Future<T> {
    return Future{
        poll_f: poll_f, result: None{}
    };
}

struct MainGeneratorCtx {
    state: i32;
    a_f: Future<i64>;
    b_f: Future<i64>;
    a: i64;
    b: i64;
}


impl MainGeneratorCtx {
    fn poll(wk:*Waker) Poll<i64> {
        while true {
            if self.state == 0 {
                self.a_f = task1();
                self.state = 1;
                return Pending{};
            } else if self.state == 1 {
                let p = self.a_f.poll(wk);
                if p is Pending {
                    return Pending{};
                } else {
                    self.a = (p as Ready<i64>!).v;
                    self.b_f = task2(self.a);
                    self.state = 2;
                    return Pending{};
                }
            } else if self.state == 2 {
                let p = self.b_f.poll(wk);
                if p is Pending {
                    return Pending{};
                } else {
                    self.b = (p as Ready<i64>!).v;
                    self.state = 3;
                    return Pending{};
                }
            } else{
                return Ready{v: self.b};
            }
        }
    }
}

Chronostasys avatar Mar 04 '24 07:03 Chronostasys

use std::chan;

pub struct Task {
    poll: |Waker|=>void;
}

pub struct Waker {
    wake: |*chan::Chan<Task>|=>void;
    ch: *chan::Chan<Task>;
}

pub struct Pending {}

pub struct Ready<T> {
    v: T;
}

pub type Poll<T> = Ready<T> | Pending;

pub trait Future<T> {
    fn poll(wk:Waker) Poll<T>;
}

pub struct FnFuture<T> {
    poll: |Waker|=>Poll<T>;
    status: Poll<T>;
}

impl <T> Future<T> for FnFuture<T> {
    fn poll(wk: Waker) Poll<T> {
        if self.status is Ready<T> {
            return self.status;
        } else {
            return self.poll(wk);
        }
    }
}

impl <T> FnFuture<T> {
    fn then<U>(f: |T|=>U) FnFuture<U> {
        return FnFuture<U> {
            poll: |wk|=>{
                if self.status is Ready<T> {
                    let v = (self.status as Ready<T>!).v;
                    return Ready<U>{v: f(v)} as Poll<U>;
                } else {
                    let p = self.poll(wk);
                    if p is Ready<T> {
                        let v = (p as Ready<T>!).v;
                        return Ready<U>{v: f(v)} as Poll<U>;
                    } else {
                        return Pending{} as Poll<U>;
                    }
                }
            },
            status: Pending{} as Poll<U>,
        };
    }
}

///==========main==========///

fn main() void {
    let exe = executor(10 as u64);
    let d = delay(5 as u64) as Future<()>;
    let ff = FnFuture<()> {
        poll: |wk|=>{
            return d.poll(wk);
        },
        status: Pending{} as Poll<()>,
    };
    let f = ff.then<i64>(|_a|=>i64{
        return 42;
    });
    exe.spawn<i64>(f as Future<i64>);
    exe.run();
    println!("done!");
    return;
}

///==========Delay============///

pub struct Delay {
    when: u64;
}

pub fn delay(secs: u64) Delay {
    return Delay {
        when: unixtime() + secs as u64
    };
}

impl Future<()> for Delay {
    fn poll(wk:Waker) Poll<()> {
        if unixtime() >= self.when {
            return Ready<()>{};
        } else {
            spawn(||=>{
                let rem = self.when - unixtime();
                if rem > 0 {
                    sleep(rem);
                }
                wk.wake(wk.ch);
                return;
            });
            return Pending{};
        }
    }
}

fn unixtime() u64;


///=========thread============///

fn new_thread(f: *||=>void) void;


pub fn spawn(f:||=>void) void {
    new_thread(&f);
    return;
}

pub fn sleep(secs: u64) void;

///==========executor=========///
pub struct Executor {
    ch: *chan::Chan<Task>;
}

pub fn executor(n: u64) Executor {
    return Executor {
        ch: &chan::channel<Task>(n)
    };
}

impl Executor {
    fn run() void {
        while true {
            let task = self.ch.recv();
            let waker = Waker {
                ch: self.ch,
                wake: |ch|=>void {
                    ch.send(task);
                    return;
                }
            };
            task.poll(waker);
        }
        return;
    }
    fn spawn<T>(f: Future<T>) void {
        let task = Task {
            poll: |wk|=>{
                f.poll(wk);
                return;
            },
        };
        self.ch.send(task);
        return;
    }
}

CjiW avatar Mar 09 '24 01:03 CjiW