async-std icon indicating copy to clipboard operation
async-std copied to clipboard

executor independent spawning

Open dvc94ch opened this issue 3 years ago • 23 comments

It looks like originally the Context was intended to have an executor attached to it so that you can spawn tasks in an executor independent way. With async-io we no longer need to rely on a specific runtime and wrapping the std types, which is a hughe gain for writing async code that works everywhere. However it is very useful sometimes to be able to spawn a background task. There doesn't seem to be a way to do this without either passing around a "spawner" or deciding on an executor. Is there likely going to be a std::task::spawn that executors can hook in to?

dvc94ch avatar Nov 21 '20 10:11 dvc94ch

It would actually be rather trivial, the only thing required would be to convince async-std and tokio to register a spawner in their #[async_std::main] and #[tokio::main] macros.

use core::future::Future;
use core::pin::Pin;
use parking_lot::{const_rwlock, RwLock};

type Spawner = fn(Pin<Box<dyn Future<Output = ()> + Send>>);

static SPAWNER: RwLock<Option<Spawner>> = const_rwlock(None);

pub fn register_spawner(spawner: Spawner) {
    let mut lock = SPAWNER.write();
    if lock.is_some() {
        panic!("spawner already registered");
    }
    *lock = Some(spawner);
}

pub fn spawn<F>(future: F)
where
    F: Future<Output = ()> + Send + 'static,
{
    SPAWNER.read().expect("no spawner registered")(Box::pin(future));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[async_std::test]
    async fn test_async_std() {
        fn async_std_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
            async_std::task::spawn(future);
        }
        register_spawner(async_std_spawn);
        spawn(async {
            println!("spawned on async-std");
        });
    }

    #[tokio::test]
    async fn test_tokio() {
        fn tokio_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
            tokio::spawn(future);
        }
        register_spawner(tokio_spawn);
        spawn(async {
            println!("spawned on tokio");
        });
    }
}

dvc94ch avatar Nov 21 '20 12:11 dvc94ch

Here are the corresponding PR's

  • https://github.com/async-rs/async-attributes/pull/19
  • https://github.com/tokio-rs/tokio/pull/3161

dvc94ch avatar Nov 21 '20 13:11 dvc94ch

        fn async_std_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
            async_std::task::spawn(future);
        }
        fn tokio_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
            tokio::spawn(future);
        }

Note that both are not enough. Both tokio::spawn and async_std::task::spawn return handle, always ignoring it is not the preferred behavior.

taiki-e avatar Nov 21 '20 13:11 taiki-e

Also, this seems to only works on global runtime. (and probably only if it is one) I think there are various other problems. There are some projects that are trying to do something similar, so it's a good idea to look at them and compare them.

taiki-e avatar Nov 21 '20 13:11 taiki-e

Also, this seems to only works on global runtime. (and probably only if it is one)

I'm not sure this is a problem, the idea is that for tests you depend on an executor and use that. But I see that it needs to be failiable in that case like env_logger::try_init.

Note that both are not enough. Both tokio::spawn and async_std::task::spawn return handle, always ignoring it is not the preferred behavior.

This likely involves boxing the joinhandle. I guess it's not something you want to do by default? I can look into this this evening or tomorrow.

dvc94ch avatar Nov 21 '20 14:11 dvc94ch

This proposal is meant to address an important problem: crate authors need a way to spawn without having to take on Tokio and async-std as a dependency that may clash with their users' preferences. The way people are currently forced to choose or compromise introduces both technical friction and social uncertainty, and slows the adoption of Rust async.

But this specific approach is not ideal. People sometimes use both Tokio and async-std in the same application (this is why async-std has the tokio02 and tokio03 features). With this PR, the executor chosen depends on whoever called register_spawner last, so it might even be non-deterministic which executor the neutral-zone spawn ends up invoking.

If the two executors are so similar that a dynamically dispatched, non-deterministic spawn is an acceptable solution for users, then one is led to wonder why they are being forced to choose between two evidently equivalent options in the first place.

In the long run, I think std is going to have to provide a standard spawn. It's just too much of a 'vocabulary' function, like Box or Option, to provide elsewhere.

jimblandy avatar Nov 21 '20 21:11 jimblandy

Actually I think there is precedent. Wouldn't it be similar to the allocator api?

dvc94ch avatar Nov 21 '20 22:11 dvc94ch

So based on the feedback received so far I made some adaptions

  • [x] return a join handle on spawn
  • [x] add try_register_spawner to be called from the #[async_std::test] and #[tokio::test] macros

I'd like to note that it is not intended to change the public api of async_std or tokio, registering a spawner is intended to be used only from the macros, being completely transparent to existing users. It is just intended to add an alternative for spawning tasks without having to commit to async-std or tokio.

Also, this seems to only works on global runtime. (and probably only if it is one)

I need some more help understanding in what cases registering more than one executor is a requirement. The suggested api async_spawner::spawn is intended only for cases where it doesn't matter which executor is executing the task, for example when using async-io, or maybe some future reactor based on io_uring.

dvc94ch avatar Nov 22 '20 13:11 dvc94ch

@dvc94ch I think you're doing some interesting research here. The sequence I hope spawn integration in the stdlib will take is:

  1. A #[global_executor] tag implemented in userland, that can be used to swap executors backing a single Task / JoinHandle impl (this is what I see you're making progress on, and I think that's exciting!)
  2. Eventually land that in userland (may take a while to fix the kinks)
  3. Eventually provide a default / fallback executor as part of the stdlib; much like #[global_allocator] has a default.

However the current signature of async_spawner::spawn is:

pub fn spawn<F>(future: F) where
    F: Future<Output = ()> + Send + 'static;

For integration in async-std we need it to be:

pub fn spawn<F, T>(future: F) -> JoinHandle<T> where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static, 

With an option to later add hooks spawn_local and spawn_blocking as well. A while ago I was experimenting with an implementation of this, but was unfortunately unsuccessful. I couldn't get the spawn and JoinHandle impls to return a type T.

The source of my experiment is here: yoshuawuyts/global-executor-prototype. I would love for someone to be able to finish this up as I think this is a required step in making async Rust more accessible to people. But I'm not sure how feasible the approach in the repo is, or how feasible it'll be to implement in userland.

yoshuawuyts avatar Nov 23 '20 14:11 yoshuawuyts

agnostik works well, I'm quite happy with the solution. All you have to do is use #[agnostik::main], #[agnostik::test] and agnostik::spawn, and it will use the executor that was enabled. I think this issue can be closed.

dvc94ch avatar Nov 23 '20 21:11 dvc94ch

Reopening. This requires the user to add agnostik as a dependency with a runtime enabled. This is very suboptimal, and I think it will require help from executors to make this seamless.

dvc94ch avatar Nov 23 '20 22:11 dvc94ch

agnostik works well, I'm quite happy with the solution. All you have to do is use #[agnostik::main], #[agnostik::test] and agnostik::spawn, and it will use the executor that was enabled. I think this issue can be closed.

I think you realized it in your follow-up comment, but unfortunately the approach of agnostik is different from the proposed #[global_executor]. agnostik indeed has dependencies on async-std and tokio, and requires toggling flags. What we want is for the dependencies to be declared the other way around: async-std and tokio should implement a trait defined by a third crate and a hook to set the "global executor". The crate then also provides a uniform interface to spawn tasks on the executor. Once that works well, we can propose to uplift that mechanism into std.

I'm not sure to which degree this is possible -- but that seems like something we should explore further.

yoshuawuyts avatar Nov 24 '20 09:11 yoshuawuyts

I'm not sure to which degree this is possible -- but that seems like something we should explore further.

Well, it will require some boxing.

/// Executor agnostic task spawning
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use once_cell::sync::OnceCell;
use futures::channel::oneshot;
use std::sync::{Arc, Mutex};

pub type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// Trait abstracting over an executor.
pub trait Executor: Send + Sync {
    /// Blocks until the future has finished.
    fn block_on(&self, future: BoxedFuture);

    /// Spawns an asynchronous task using the underlying executor.
    fn spawn(&self, future: BoxedFuture) -> BoxedFuture;

    /// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
    fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send>) -> BoxedFuture;

    /// Spawns a future that doesn't implement [Send].
    ///
    /// The spawned future will be executed on the same thread that called `spawn_local`.
    ///
    /// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
    fn spawn_local(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> BoxedFuture;
}

static EXECUTOR: OnceCell<Box<dyn Executor>> = OnceCell::new();

/// Error returned by `try_register_executor` indicating that an executor was registered.
#[derive(Debug)]
pub struct ExecutorRegistered;

impl core::fmt::Display for ExecutorRegistered {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "async_spawner: executor already registered")
    }
}

impl std::error::Error for ExecutorRegistered {}

pub fn try_register_executor(executor: Box<dyn Executor>) -> Result<(), ExecutorRegistered> {
    EXECUTOR.set(executor).map_err(|_| ExecutorRegistered)
}

pub fn register_executor(executor: Box<dyn Executor>) {
    try_register_executor(executor).unwrap();
}

pub fn executor() -> &'static Box<dyn Executor> {
    EXECUTOR.get().expect("async_spawner: no executor registered")
}

/// Blocks until the future has finished.
pub fn block_on<F, T>(future: F) -> T
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let lock = Arc::new(Mutex::new(None));
    let lock2 = lock.clone();
    executor().block_on(Box::pin(async move {
        let res = future.await;
        let mut lock = lock2.lock().unwrap();
        *lock = Some(res);
    }));
    let mut res = lock.lock().unwrap();
    res.take().unwrap()
}

pub struct JoinHandle<T> {
    handle: BoxedFuture,
    rx: oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
            if let Poll::Ready(Ok(res)) = Pin::new(&mut self.rx).poll(cx) {
                Poll::Ready(res)
            } else {
                panic!("task paniced");
            }
        } else {
            Poll::Pending
        }
    }
}

/// Spawns an asynchronous task using the underlying executor.
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let (tx, rx) = oneshot::channel();
    let handle = executor().spawn(Box::pin(async move {
        let res = future.await;
        tx.send(res).ok();
    }));
    JoinHandle { handle, rx }
}

/// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,
{
    let (tx, rx) = oneshot::channel();
    let handle = executor().spawn_blocking(Box::new(move || {
        let res = task();
        tx.send(res).ok();
    }));
    JoinHandle { handle, rx }
}

/// Spawns a future that doesn't implement [Send].
///
/// The spawned future will be executed on the same thread that called `spawn_local`.
///
/// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + 'static,
    T: Send + 'static,
{
    let (tx, rx) = oneshot::channel();
    let handle = executor().spawn_local(Box::pin(async move {
        let res = future.await;
        tx.send(res).ok();
    }));
    JoinHandle { handle, rx }
}

dvc94ch avatar Nov 27 '20 15:11 dvc94ch

Released as async-spawner 2.0.0 with improvements to docs and tests, @yoshuawuyts does this meet the bar for inclusion? If yes I can update the async-attributes PR. https://docs.rs/async-spawner/2.0.0/async_spawner/

dvc94ch avatar Nov 27 '20 17:11 dvc94ch

Having the Executor trait methods take already-boxed futures allows the trait to be object safe. But executors go to great lengths to minimize the number of dynamic allocations per spawn (async_std seems to have held it down to one, so far, not counting allocations of shared structures that can be amortized across many spawns). I think this Executor trait would require two allocations per spawn: one by the caller of the Executor method to produce a BoxedFuture, and a second by the Executor implementer to combine the boxed future with its associated metadata (space for task-local variables, names, and so on).

It's possible that executors could avoid this by taking advantage of the known size of a BoxedFuture, and arena-allocating everything.

Another way around this would be to make the client do all the allocation, with the executor providing a Layout describing its own metadata, and the client using Layout::extend to add space for the future, doing the allocation, initializing the appropriate part of the allocation, and then passing partially-initialized allocations to the Executor methods. That's pretty unsafe, though.

jimblandy avatar Nov 27 '20 20:11 jimblandy

Actually it would require at least three since we're also boxing the JoinHandle, and I'm guessing that the oneshot channel requires a fourth.

It's possible that executors could avoid this by taking advantage of the known size of a BoxedFuture, and arena-allocating everything.

So what would the api for that look like? I'm not very familiar with the internals of executors.

dvc94ch avatar Nov 27 '20 21:11 dvc94ch

So what would the api for that look like? I'm not very familiar with the internals of executors.

I am not sure, honestly. It just seemed to me that having a fixed size for the futures was a 'handhold' on the problem that could be useful. I think the only thing I have to contribute is that we need to be conscious of how changes to the interface affect the allocation count.

jimblandy avatar Nov 27 '20 21:11 jimblandy

I think it's feasible to reduce the two allocations (oneshot and joinhandle) to one, I'll try that tomorrow. However I expect your suggestion will require exposing some executor internals. It would be great if we could reduce it to two allocations.

dvc94ch avatar Nov 27 '20 22:11 dvc94ch

@jimblandy using a bump allocator was a good solution, never used one before.

Managed to get all except spawn_blocking to use a single allocation in async-spawner, and if the underlying executor supports passing a bump allocator, it could spawn the rest of the task using it too. spawn_blocking requires an additional allocation, because the call_once method is not stable, so only Box can implement FnOnce for Box<dyn FnOnce>.

/// Trait abstracting over an executor.
pub trait Executor: Send + Sync {
    /// Blocks until the future has finished.
    fn block_on(&self, future: Pin<&mut (dyn Future<Output = ()> + Send + 'static)>);

    /// Spawns an asynchronous task using the underlying executor.
    fn spawn(&self, bump: &Bump, future: BumpFuture) -> BumpFuture;

    /// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
    fn spawn_blocking(&self, bump: &Bump, task: Box<dyn FnOnce() + Send>) -> BumpFuture;

    /// Spawns a future that doesn't implement [Send].
    ///
    /// The spawned future will be executed on the same thread that called `spawn_local`.
    ///
    /// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
    fn spawn_local(
        &self,
        bump: &Bump,
        future: Pin<BumpBox<'static, dyn Future<Output = ()>>>,
    ) -> BumpFuture;
}

/// Executor agnostic join handle.
pub struct JoinHandle<T: 'static> {
    handle: BumpFuture,
    res: BumpBox<'static, Mutex<Option<T>>>,
    #[allow(unused)]
    bump: Bump,
}

impl<T> Future for JoinHandle<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
            let mut res = self.res.lock().unwrap();
            Poll::Ready(res.take().unwrap())
        } else {
            Poll::Pending
        }
    }
}

/// Spawns an asynchronous task using the underlying executor.
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let bump = Bump::new();
    let bump_ref = unsafe { &*(&bump as *const _) };
    let res = BumpBox::new_in(Mutex::new(None), bump_ref);
    let res_ref: &'static Mutex<Option<T>> = unsafe { &*(&*res as *const _) };
    let future = BumpBox::new_in(
        async move {
            let res = future.await;
            let mut cell = res_ref.lock().unwrap();
            *cell = Some(res);
        },
        &bump,
    );
    let handle = executor().spawn(&bump, coerce_bump_box_pin!(future));
    JoinHandle { bump, handle, res }
}

The one thing missing would be to figure out the size before hand and use Bump::with_capacity.

dvc94ch avatar Nov 28 '20 19:11 dvc94ch

Ups, that will only work if the join handle doesn't get dropped.

dvc94ch avatar Nov 28 '20 19:11 dvc94ch

Yeah, bump allocators are only simple because they impose restrictions on the order in which things can be freed. I don't think we know anything about the order in which these tasks will exit, or when the join handles will be dropped. I think you want an arena allocator instead. The async-io crate uses vec-arena.

Is there a way to avoid baking the choice of allocator into the Executor trait? If this is going to eventually be the interface between Rust's std and async executor implementations, it's going to have to go through the RFC process and get stabilized. I'd be surprised to see a type like Bump or vec_arena::Arena appearing in such an API.

(I'm sure I'm coming across as negative, but that's not my intent—I'd like to see this succeed. There are just a lot of design constraints to take into account.)

jimblandy avatar Nov 28 '20 20:11 jimblandy

I'm completely fine with four allocations, this is really exploding the scope. It was intended to be a simple solution. If you're planning on spawning hundreds of tasks per second, you should consider a different one.

dvc94ch avatar Nov 29 '20 01:11 dvc94ch

Actually it's impossible to do it in one allocation. It requires at least two. Even with the bump allocation scheme mentioned earlier, it's impossible to determine the size before hand as when allocating something like:

struct TaskAlloc {
    layout: Layout,
    ptr: *const AtomicUsize,
    task: *const (),
    output: *const (),
    join_handle: *const (),
}

task needs to be the equivalent of Box<dyn Future<Output = ()>> for the executor, but you'd need to allocate Box<dyn Future<Output = T>> first so that you can construct it. Otherwise TaskAlloc would have to be generic in some way defeating the purpouse of eliding the types.

The only way to do it so that it's equally efficient would be for block_on, spawn, spawn_blocking and spawn_local to be compiler extrinsics and the compiler replacing them with the async-std or tokio equivalents imo.

dvc94ch avatar Nov 29 '20 16:11 dvc94ch