tokio icon indicating copy to clipboard operation
tokio copied to clipboard

Scoped tasks

Open jzrake opened this issue 4 years ago • 40 comments

I would love to be able to spawn non-'static tasks into the Tokio runtime, like so:

let local_data = LocalData::new();

let result = runtime.block_on_scoped(|scope| {

    let future_result = scope.spawn(async {
        local_data.result()
    });

    async {
        future_result.await.unwrap()
    }
});

This type of thing exists in Crossbeam: https://docs.rs/crossbeam/0.8.0/crossbeam/fn.scope.html Someone wrote this for an older version of Tokio: https://docs.rs/tokio-scoped/0.1.0/tokio_scoped

jzrake avatar Nov 21 '20 16:11 jzrake

related to https://github.com/tokio-rs/tokio/issues/2013

akhilles avatar Nov 27 '20 20:11 akhilles

I suppose that this would be possible to add as a block_on variant.

Darksonn avatar Nov 28 '20 21:11 Darksonn

There is also this, which is no longer maintained: https://github.com/Snnappie/tokio-scoped.

jzrake avatar Dec 13 '20 03:12 jzrake

And this: https://docs.rs/async-scoped/0.5.1/async_scoped, although it looks like it's hard-coded to use the "current thread" runtime.

jzrake avatar Dec 13 '20 18:12 jzrake

I have an initial attempt at "proper" scoped tasks in https://github.com/farnz/tokio/commit/0ce475868061e8620fe52fb0d9e026f7db039ac8, but I need advice to continue.

Specifically, I'm not clear on how drop should forcibly transition a task to completed state so that the future it's running can be dropped.

Do I need to think through the locking so that I can swap the future I'm trying to drop for a "dummy" future and let the runtime complete a detached task that no longer owns the future I want to drop, or have I missed a better way to handle it?

farnz avatar Dec 15 '20 10:12 farnz

Sorry, scoped tasks cannot rely on Drop because you can use mem::forget to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.

Darksonn avatar Dec 15 '20 11:12 Darksonn

Sorry, scoped tasks cannot rely on Drop because you can use mem::forget to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.

Do you mind expanding on this? I would have thought that the same issues apply to crossbeam scoped tasks, which also rely on Drop to avoid the scope being lost before the thread can exit.

I'm on Discord #tokio-dev if you'd rather talk me through it there.

farnz avatar Dec 15 '20 11:12 farnz

The answer is that crossbeam does not rely on Drop for this. They use a scope function, and if any of the threads are still running when you return from the closure, the scope function will wait for those before scope returns.

Darksonn avatar Dec 15 '20 11:12 Darksonn

FTR, the reason we can't do crossbeam type scoping from an async context is that crossbeam blocks the thread that calls scope for the duration of the scoped threads execution. Any async equivalent has to return state so that we can go back to the executor instead of blocking, and that state could be forgotten in error.

farnz avatar Dec 15 '20 11:12 farnz

Push for this since scoped threads were added in Rust 1.63.0, and this feature is really awesome! Code is so much cleaner and readable without the use of Arcs and clone()s all over the place.

theRookieCoder avatar Dec 04 '22 13:12 theRookieCoder

The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a block_on call, but they cannot be used from within the runtime.

Darksonn avatar Dec 04 '22 13:12 Darksonn

Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?

theRookieCoder avatar Dec 04 '22 14:12 theRookieCoder

Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?

This is a problem of how future works. Since future is always cancellable, tokio just cannot guarantee that the task is done (.awaited) before scoped_spawn (or sth like that) returns since they can be just cancelled and destruct the variable referenced in the task.

To solve this problem, we either need linear type to disallow dropping and requires the future returned bby scoped_spawn to be .awaited until it is ready, or having a new trait for Future that requires to either not be polled, or polled until Ready.

Or, if supports defer! or async drop, it can just cancel the task and wait for it to be done.

NobodyXu avatar Dec 04 '22 14:12 NobodyXu

It's a limitation in Rust itself.

Darksonn avatar Dec 04 '22 14:12 Darksonn

Interesting, thanks for the clarity!

theRookieCoder avatar Dec 04 '22 14:12 theRookieCoder

The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a block_on call, but they cannot be used from within the runtime.

Does this mean that a variant of LocalSet::block_on like the block_on_scoped originally suggested is possible or is not possible? Because if even just that is possible, it doesn't seem too limiting to me. My use case is that I want to write single-threaded servers using async code, much as one does with Node.js. With the server being single-threaded and structured entirely as a local task set, mutable state can be shared among the tasks with only a RefCell wrapper, which is quite nice. In this case, all my async code is naturally run within LocalSet::block_on using a current-thread runtime, so a block_on_scoped here would be perfect.

mattbork avatar Dec 09 '22 23:12 mattbork

A block_on_scoped is possible since it can, you know, block the thread. Though for the use-case you mention, you can easily achieve the same with an Rc.

Darksonn avatar Dec 10 '22 09:12 Darksonn

What about the async scope (scoped spawn) design for C++'s p2300 sender/receiver async proposal? https://github.com/kirkshoop/async_scope/blob/main/asyncscope.md

As the paper says, since not every async-function has a clear chain of work to consume or block on the result, spawn is indeed useful. But the problem with spawn is that they provide unstructured concurrency. This is an unnecessary and unwelcome and undesirable property for concurrency. Using these algorithms leads to problems with lifetimes that are often 'fixed' using shared_ptr for ad-hoc garbage collection.

Structured Programming [Dahl72] transformed the software world by making it easier to reason about the code and build large software from simpler constructs. We want to achieve the same effect on concurrent programming by ensuring that we structure our concurrent code. The spawn algorithm fails the single entry, single exit point principle by behaving like a GOTO instruction. By calling spawn we essentially continue in two places: in the same function, and on different thread that executes the given work. Moreover, the lifetime of the work started by spawn cannot be bound to the local context. This will prevent local reasoning, which will make the program harder to understand.

To properly structure our concurrency, we need an abstraction that ensures that all the async-functions being spawned are attached to an enclosing async-function. This is the goal of scoped spawn.

@Darksonn

npuichigo avatar Aug 28 '23 04:08 npuichigo

@npuichigo The problem we are facing here is that an async scope can be subject to mem::forget, which makes any async scope API unsound. It's not the kind of problem that a C++ proposal can help with.

Darksonn avatar Aug 28 '23 07:08 Darksonn

I don't quite get the context. What do you mean we can't depend on Drop?

npuichigo avatar Aug 28 '23 07:08 npuichigo

The answer is that crossbeam does not rely on Drop for this. They use a scope function, and if any of the threads are still running when you return from the closure, the scope function will wait for those before scope returns.

@Darksonn Can we build a crossbeam like spawn which does not rely on Drop but wait for completes?

Or something like:

let task = use_resources(
   |scope| {
      async {
         let data = get_data().await;
         for i in 0..10 {
             scope.spawn(do_work(data[i]));
         }
      }
   },
   AsyncScope::new()
)
rt::block_on(task);

spawn may be eager or lazy here, but will keep track of all the tasks it spawned and wait until they are completed.

npuichigo avatar Aug 28 '23 07:08 npuichigo

Either the scope function is not async, in which case it will block the thread, or the scope function is async, in which case it will be subject to the mem::forget issue and be unsound.

Neither is really an acceptable solution in async code.

Darksonn avatar Aug 28 '23 08:08 Darksonn

@Darksonn you mentioned here it's related to cancelation. Can u explain that? Thanks.

npuichigo avatar Aug 28 '23 08:08 npuichigo

Let's say that you call scope, which borrows from a local variable called data. If the scope function is async, then it returns a future. You can poll that future once, which ensures that the background task is spawned. Then you can mem::forget the future returned by scope. Once its been forgotten, you can destroy data. However, the background task is still running, and if it tries to touch data, that's a use-after-free.

Darksonn avatar Aug 28 '23 08:08 Darksonn

@Darksonn What about add one layer of abstraction like async resource to make sure attached resources like data will only be destroyed after all tasks complete.

rt::block_on(use_resource(
        |scope, data| async {
            scope.spawn(|| do_work(data));
        },
        AsyncScope::new(),
        data,
    ));

npuichigo avatar Aug 28 '23 08:08 npuichigo

If you have ownership of data, then you can already implement something similar today with the existing APIs.

Darksonn avatar Aug 28 '23 08:08 Darksonn

You are right @Darksonn , but tokio's spawn is eager only, which will start running in the background immediately when spawn is called, even if you don’t await the returned JoinHandle.

With scoped spawn, since the scope is the root of all spawned tasks, it can be lazy and only start the works when the parent future is polled.

In async scope proposal, it has three spawn methods spawn, spawn_future and nest

  1. spawn plays a fire and forget role and is eager. It's return value is void and the return value of async function will be disgarded.
  2. spawn_future is eager too, but will return a future, which can be used to await the result.
  3. nest is lazy, the async work will be started only when the root future is polled or be blocked on.

In terms of performance, nest() does not introduce any penalty. spawn() is more expensive than nest() as it needs to allocate memory for the operation. spawn_future() is even more expensive than spawn(); there's possible race condition needs to be resolved. nest() does not require allocations, so it can be used in a free-standing environment.

That is what the advantage of structured concurrency I mentioned before. Unstructured concurrency like fire-and-forget leads to problems with lifetimes that are often 'fixed' using reference count or something else.

npuichigo avatar Aug 28 '23 09:08 npuichigo

The nest method sounds similar to FuturesUnordered and similar utilities from the futures crate. They don't enable parallelism, since everything runs within the same task, but is the closest you can get to scoped tasks.

Darksonn avatar Aug 28 '23 09:08 Darksonn

The nest still allow parallelism. The future returned by a call to nest() holds a reference to the async_scope in order to actually spawn the task for scheduling when it is started. Start the future returned from nest() will start the given future and add it to the count of futures that the async_scope object will require to complete before it will destruct.

npuichigo avatar Aug 28 '23 10:08 npuichigo

Even if you prevent the scope from being destroyed with a reference count, that's not enough. Consider:

use std::sync::Arc;

fn main() {
    let my_data = Box::new(10);
    
    // Some reference counted object that internally contains a pointer
    // to the allocation of `my_data`.
    let my_arc = Arc::new(&*my_data);
    
    // Forgetting the arc leaks the reference counted object, and it lives
    // forever.
    std::mem::forget(my_arc);
    
    // However, we can still free the allocation of `my_data`.
    drop(my_data);
}

If you've given away a clone of my_arc to another thread before you forget it, then you're now in trouble. That thread could use the value after my_data is dropped.

Darksonn avatar Aug 28 '23 10:08 Darksonn