tokio
tokio copied to clipboard
Scoped tasks
I would love to be able to spawn non-'static
tasks into the Tokio runtime, like so:
let local_data = LocalData::new();
let result = runtime.block_on_scoped(|scope| {
let future_result = scope.spawn(async {
local_data.result()
});
async {
future_result.await.unwrap()
}
});
This type of thing exists in Crossbeam: https://docs.rs/crossbeam/0.8.0/crossbeam/fn.scope.html Someone wrote this for an older version of Tokio: https://docs.rs/tokio-scoped/0.1.0/tokio_scoped
related to https://github.com/tokio-rs/tokio/issues/2013
I suppose that this would be possible to add as a block_on
variant.
There is also this, which is no longer maintained: https://github.com/Snnappie/tokio-scoped.
And this: https://docs.rs/async-scoped/0.5.1/async_scoped, although it looks like it's hard-coded to use the "current thread" runtime.
I have an initial attempt at "proper" scoped tasks in https://github.com/farnz/tokio/commit/0ce475868061e8620fe52fb0d9e026f7db039ac8, but I need advice to continue.
Specifically, I'm not clear on how drop
should forcibly transition a task to completed
state so that the future it's running can be dropped.
Do I need to think through the locking so that I can swap the future I'm trying to drop for a "dummy" future and let the runtime complete a detached task that no longer owns the future I want to drop, or have I missed a better way to handle it?
Sorry, scoped tasks cannot rely on Drop
because you can use mem::forget
to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.
Sorry, scoped tasks cannot rely on
Drop
because you can usemem::forget
to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.
Do you mind expanding on this? I would have thought that the same issues apply to crossbeam scoped tasks, which also rely on Drop
to avoid the scope being lost before the thread can exit.
I'm on Discord #tokio-dev if you'd rather talk me through it there.
The answer is that crossbeam does not rely on Drop
for this. They use a scope
function, and if any of the threads are still running when you return from the closure, the scope
function will wait for those before scope
returns.
FTR, the reason we can't do crossbeam type scoping from an async context is that crossbeam blocks the thread that calls scope
for the duration of the scoped threads execution. Any async equivalent has to return state so that we can go back to the executor instead of blocking, and that state could be forgotten in error.
Push for this since scoped threads were added in Rust 1.63.0, and this feature is really awesome! Code is so much cleaner and readable without the use of Arc
s and clone()
s all over the place.
The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a block_on
call, but they cannot be used from within the runtime.
Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?
Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?
This is a problem of how future works.
Since future is always cancellable, tokio just cannot guarantee that the task is done (.await
ed) before scoped_spawn
(or sth like that) returns since they can be just cancelled and destruct the variable referenced in the task.
To solve this problem, we either need linear type to disallow dropping and requires the future returned bby scoped_spawn
to be .await
ed until it is ready, or having a new trait for Future
that requires to either not be polled, or polled until Ready
.
Or, if supports defer!
or async drop, it can just cancel the task and wait for it to be done.
It's a limitation in Rust itself.
Interesting, thanks for the clarity!
The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a
block_on
call, but they cannot be used from within the runtime.
Does this mean that a variant of LocalSet::block_on
like the block_on_scoped
originally suggested is possible or is not possible? Because if even just that is possible, it doesn't seem too limiting to me. My use case is that I want to write single-threaded servers using async code, much as one does with Node.js. With the server being single-threaded and structured entirely as a local task set, mutable state can be shared among the tasks with only a RefCell
wrapper, which is quite nice. In this case, all my async code is naturally run within LocalSet::block_on
using a current-thread runtime, so a block_on_scoped
here would be perfect.
A block_on_scoped
is possible since it can, you know, block the thread. Though for the use-case you mention, you can easily achieve the same with an Rc
.
What about the async scope (scoped spawn) design for C++'s p2300 sender/receiver async proposal? https://github.com/kirkshoop/async_scope/blob/main/asyncscope.md
As the paper says, since not every async-function has a clear chain of work to consume or block on the result, spawn is indeed useful. But the problem with spawn is that they provide unstructured concurrency. This is an unnecessary and unwelcome and undesirable property for concurrency. Using these algorithms leads to problems with lifetimes that are often 'fixed' using shared_ptr for ad-hoc garbage collection.
Structured Programming [Dahl72] transformed the software world by making it easier to reason about the code and build large software from simpler constructs. We want to achieve the same effect on concurrent programming by ensuring that we structure our concurrent code. The spawn algorithm fails the single entry, single exit point principle
by behaving like a GOTO instruction. By calling spawn we essentially continue in two places: in the same function, and on different thread that executes the given work. Moreover, the lifetime of the work started by spawn cannot be bound to the local context. This will prevent local reasoning, which will make the program harder to understand.
To properly structure our concurrency, we need an abstraction that ensures that all the async-functions being spawned are attached to an enclosing async-function. This is the goal of scoped spawn.
@Darksonn
@npuichigo The problem we are facing here is that an async scope can be subject to mem::forget
, which makes any async scope API unsound. It's not the kind of problem that a C++ proposal can help with.
I don't quite get the context. What do you mean we can't depend on Drop?
The answer is that crossbeam does not rely on
Drop
for this. They use ascope
function, and if any of the threads are still running when you return from the closure, thescope
function will wait for those beforescope
returns.
@Darksonn Can we build a crossbeam like spawn
which does not rely on Drop but wait for completes?
Or something like:
let task = use_resources(
|scope| {
async {
let data = get_data().await;
for i in 0..10 {
scope.spawn(do_work(data[i]));
}
}
},
AsyncScope::new()
)
rt::block_on(task);
spawn may be eager or lazy here, but will keep track of all the tasks it spawned and wait until they are completed.
Either the scope function is not async, in which case it will block the thread, or the scope function is async, in which case it will be subject to the mem::forget
issue and be unsound.
Neither is really an acceptable solution in async code.
@Darksonn you mentioned here it's related to cancelation. Can u explain that? Thanks.
Let's say that you call scope
, which borrows from a local variable called data
. If the scope
function is async, then it returns a future. You can poll that future once, which ensures that the background task is spawned. Then you can mem::forget
the future returned by scope
. Once its been forgotten, you can destroy data
. However, the background task is still running, and if it tries to touch data
, that's a use-after-free.
@Darksonn What about add one layer of abstraction like async resource to make sure attached resources like data
will only be destroyed after all tasks complete.
rt::block_on(use_resource(
|scope, data| async {
scope.spawn(|| do_work(data));
},
AsyncScope::new(),
data,
));
If you have ownership of data
, then you can already implement something similar today with the existing APIs.
You are right @Darksonn , but tokio's spawn is eager only, which will start running in the background immediately when spawn is called, even if you don’t await the returned JoinHandle.
With scoped spawn, since the scope is the root of all spawned tasks, it can be lazy and only start the works when the parent future is polled.
In async scope proposal, it has three spawn methods spawn
, spawn_future
and nest
-
spawn
plays a fire and forget role and is eager. It's return value is void and the return value of async function will be disgarded. -
spawn_future
is eager too, but will return a future, which can be used to await the result. -
nest
is lazy, the async work will be started only when the root future is polled or be blocked on.
In terms of performance, nest() does not introduce any penalty. spawn() is more expensive than nest() as it needs to allocate memory for the operation. spawn_future() is even more expensive than spawn(); there's possible race condition needs to be resolved. nest() does not require allocations, so it can be used in a free-standing environment.
That is what the advantage of structured concurrency I mentioned before. Unstructured concurrency like fire-and-forget leads to problems with lifetimes that are often 'fixed' using reference count or something else.
The nest
method sounds similar to FuturesUnordered
and similar utilities from the futures crate. They don't enable parallelism, since everything runs within the same task, but is the closest you can get to scoped tasks.
The nest
still allow parallelism. The future returned by a call to nest() holds a reference to the async_scope in order to actually spawn the task for scheduling when it is started. Start the future returned from nest() will start the given future and add it to the count of futures that the async_scope object will require to complete before it will destruct.
Even if you prevent the scope from being destroyed with a reference count, that's not enough. Consider:
use std::sync::Arc;
fn main() {
let my_data = Box::new(10);
// Some reference counted object that internally contains a pointer
// to the allocation of `my_data`.
let my_arc = Arc::new(&*my_data);
// Forgetting the arc leaks the reference counted object, and it lives
// forever.
std::mem::forget(my_arc);
// However, we can still free the allocation of `my_data`.
drop(my_data);
}
If you've given away a clone of my_arc
to another thread before you forget it, then you're now in trouble. That thread could use the value after my_data
is dropped.