nim-chronos
nim-chronos copied to clipboard
[RFC] Async I/O with structural control flow (a.k.a. Enforced Awaits)
EDIT: 2020-02-12 Expanded the introduction section with more details explaining the existing problems in non-structural async.
Abstract
This is a proposal to change the semantics of the async
procs in a way that enforces a more structural control flow. The goal of the new APIs is to force you to await
your async operations, while still allowing you to easily execute multiple operations in parallel. The proposal eliminates a large category of usage errors with the old APIs and enables some additional optimisations such as storing your Future[T]
results on the stack and creating async procs consuming stack-based openarray[T]
inputs and var
parameters.
For a more comprehensive set of rationales for enforcing the structural control flow proposed here, please read the following article:
https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
Acknowledgements: some of the key ideas in this proposal were first suggested by @yglukhov in private conversations.
Current problems:
P1) All non-ref input parameters to async procs must be copied
Consider the following async proc:
proc checkBrokenLinks(uris: seq[Uri]): Future[seq[bool]] {.async.} =
## Tests all supplied URLs in parallel and returns
## whether they are still reachable or not.
...
If this wasn't an async proc, Nim would pass the supplied input sequence as a read-only reference (please note that I'm using C++ terminology here). This relies on the fact that the lifetime of the sequence at the call-site will surely extend until the moment the function delivers its result.
Unfortunately, in the async world this is no longer the case. The caller of checkBrokenLinks
is free to use it like this:
proc brokenCode: Future[seq[bool] =
var uris = @[
Uri.init("git://github.com/status-im/nim-chronos"),
Uri.init("https://status.team")
]
let brokenLinksFut = checkBrokenLinks(uris)
...
return brokenLinksFut
If the uris
sequence was passed by reference, the async proc may be resumed after brokenCode
returns which may result in accessing the now dangling reference. To avoid this problem, the Nim compiler makes sure to copy all input parameters of the async proc to corresponding fields in the "environment" struct associated with the async proc's closure iterator. This copying may be quite expensive for value types such as string
and seqs
and the users are advised to avoid using such type in async procs and to prefer using ref
parameters where only a pointer must be copied.
P2) var
and openarray
parameters are not supported
As a corollary from the previous problem, it becomes impossible to use var
and openarray
parameters with async procs, because these require the input data to be passed by reference.
P3) The async/await syntax has easily-accessible degrees of freedom that may be dangerous for novice users
Consider the following simple async proc:
proc terminateConnection(s: AsyncSocket) {.async.} =
var myDisconnectMsg: array[X, byte]
prepareDisconnect(myDisconnectMsg)
var res = s.send(addr MyDisconnectMsg[0], X) # oops, forgot to call await here
s.close()
It showcases two critical problems triggered by a simple omission of await
in the code:
- The socket will be closed prematurely.
- The
send
operation will be working with bogus data.
This proposal argues that the default behavior should be completely safe and impossible to misuse, while the more advanced concerns such as enabling parallel execution could be handled with a more specialized syntax.
The Proposed Solution:
We create a new set of APIs that hide the explicit use of Future
values in the user code and enforce awaiting of all async operations. If all operations are awaited, it becomes possible to store the inputs of the said operations in the "pseudo stack" associated with the async proc, which in turn enables the use of the reference types such as lent
, var
and openarray
providing much better safety than the current pointer/len
inputs.
So, here is the full set of new APIs:
1. Allow await
to be given multiple arguments or a tuple
proc httpRequest(url: string): Future[HttpResult]
proc jsonRpcCall(url: string): Future[JsonNode]
proc foo {.async.} =
var keyword = "Status"
var (googlePage, jsonData) = await(httpRequest(&"http://google.com/?q={keyword}"),
jsonRpcCall("localhost/myApi"))
echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
echo "JSON RESPONSE\n", jsonData{"result"}
This form of await
just performs the I/O operations in parallel returning a tuple of the final results. It is similar to using var r1 = request(); var r2 = request(); await all(r1, r2)
in the current design.
For convenience await (foo, bar)
is considered the same as await(foo,bar)
.
2. Introduce a new select
API (EDIT: this point is made partially obsolete by point 4)
select
is a new API that is given a number of I/O operations that should be started in parallel. The key difference from await
is that the handlers installed for each operation will be executed as soon as any of the results are ready. Control flow keywords such as return
and break
can be used to cancel some of the outstanding operations:
proc foo {.async.} =
var keyword = "Status"
var timedOut = false
select:
httpRequest(&"http://google.com/?q={keyword}") as response:
# executes this as soon as it's ready
echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
jsonRpcCall("localhost/myApi") as jsonData:
echo jsonData{"result"}
return # returns from the current proc; skips the other handlers
*timeout(100):
# `timeout` is the same as `sleepAsync`
timedOut = true
break # continues after the select; skips the other handlers
echo "async ops ", if timedOut: "timed out" else: "finished"
The execution after the select
block continues when all of the handlers have been executed, although there must be a way to mark some of them as optional (here, I've used *
for this).
The named results are considered in scope after the select
statement. You can choose to only name a particular result without providing a handling block.
3. Introduce a new safeasync
pragma (EDIT: this may well be the default mode)
The safeasync
pragma is responsible for inserting the await
keyword in automatic way. It also has the role of the current multisync
pragma in the sense that it allows you to compile the same code for both sync and async usage:
proc foo: bool {.safeasync.} =
var keyword = "Status"
# Notice how I don't need to use await anymore
var (googlePage, jsonData) = (httpRequest(&"http://google.com/?q={keyword}"),
jsonRpcCall("localhost/myApi"))
return googlePage.status == 200 and not jsonData.hasKey("error")
How does this work? It inserts a call to a template called implicitAwait
on each expression within the proc's body. implicitAwait
is defined as an identity for all non-future types and as a regular await
statement for all futures:
template implicitAwait(x: auto): auto = x
template implicitAwait(x: Future): auto = await x
Please note that the body of a safeasync
will work in synchronous mode by executing each operation in turn. It's also possible to compile the code for implicit off-loading to a background thread pool in programs that don't feature an asynchronous event loop.
Appending 3.A
Please note that using the await
statement may still be supported inside safeasync
procs. One may use it to improve the code clarity. It's also possible to implement safeasync
in an alternative way that requires the use of await
and signals any omission as an error, but the arguments for this are not very strong - in all code there might be significant differences between operations that are algorithmically cheaper or heavier. It's usually the names of the operations that reveal where the I/O waits will happen.
4. Support async operations in parallel
blocks
I'm extending the proposal to also enhance Nim's parallel
construct with additional support for async operations. This proposal can replace the need for a separate select
API, although it could still exist as a simple high-level helper. The new features are the following:
Within parallel
blocks:
4.1) Allow spawn
to be followed by a do
block that will be executed with the result of the operation, once complete.
4.2) Allow spawn
to be used with procs returning Future[T]
results. spawn
immediately starts the async operation and it adds the Future
to a list of tasks to be awaited just before the exit of the parallel block. This enforces the structural handling of the async operations, but one can still work with the returned futures in the familiar fashion - passing them to helper procs, setting up callbacks and so on. It is guaranteed that the callbacks will be executed in the same thread that has entered the parallel block.
4.3) Add a new call called spawnOptional
that launches non-critical parallel operations. If the parallel
block is able to complete before all such operations have completed, they are simply cancelled.
4.4) Support break
and return
in parallel blocks by cancelling all outstanding operations.
With such an API, the select
example above becomes:
proc foo {.async.} =
var keyword = "Status"
var timedOut = false
parallel:
spawn httpRequest(&"http://google.com/?q={keyword}") do (response):
# executes this as soon as it's ready
echo "HTTP RESPONSE ", googlePage.status, "\n", googlePage.body
spawn jsonRpcCall("localhost/myApi") do (jsonData):
echo jsonData{"result"}
return # returns from the current proc; skips the other handlers
spawnOptional timeout(100) do:
# `timeout` is the same as `sleepAsync`
timedOut = true
break # continues after the select; skips the other handlers
echo "async ops ", if timedOut: "timed out" else: "finished"
Please note that such a parallel
block will be more powerful than the select
construct, because it enables you to add multiple tasks to be awaited from a loop.
The use of parallel
blocks and spawn
comes at a cost. All parameters passed in the spawn
expression must be copied inside the spawned task. Please note that this matches precisely the behavior of spawn
when it comes to sending computational tasks to a thread pool as well.
4.5) Introduce an underlying object representing the "parallel block" and create an accessor for it (e.g. a thisParallelBlock
magic valid only inside the block). This object will feature operations such as addThreadJob
, addAsyncIO
, addOptionalAsyncIO
. It's the equivalent to the nursery
object described in the article linked in the abstract. Its goal is to enable the creation of helper libraries that perform something with the parallel block context.
parallel:
addJobs(thisParallelBlock)
4.6) Define the exception-handling semantics inside parallel blocks - if an exception is thrown by a spawned task inside a parallel block, this exception will be re-raised in the thread that has entered the block. All other spawned tasks are cancelled.
5. Support async tasks in spawn
outside of parallel
blocks.
This is an escape hatch that will replace the current usages of asyncCheck
and traceAsyncErrors
. Semantically, it spawns a new "async thread" of execution. Just like when spawning a regular thread, all parameters passed to the spawn
expression must be copied or moved in the new thread. The spawned function must annotated with raises: [Defect]
. If it terminates with a Defect
, the whole process is also terminated.
6. Migration path for the current async
A semi backwards-compatible async
pragma can be added to serve as a drop-in replacement for the existing async
pragma. It will differ in only one way. All started async operations will be added to a list that is awaited at the end of their scope. This is not strictly backwards-compatible, but most of the existing async code should not be affected by the change in semantics.
Both point 1
and point 2
can be replaced by primitive like this: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait.
From my experience, there no need on
var (a, b, c) = await(afut, bfut, cfut)
because it used very rare in real world, most of the time you filling array of tasks and waiting for any of them or for all of them. Also you will not find many lines of code which uses and
or or
constructions.
The dynamic interface in Python allows for working with heterogeneous result types, while in Nim you have to resort to having named Future
variables from which the results can be extracted (losing the "structured control-flow" aspect of the design).
The key difference in the new proposal is that the result variables have their proper type in all places and the explicit use of futures is hidden.
Otherwise, I've considered the case of requesting a sequence of homogenous operations. I think this should be a separate request throttling API. (EDIT: this use case is now covered by the parallel
block enhancements).
I'd also say that the need for heterogeneous parallel I/O is quite common in web development for example where it is typical for a web request to lead to several sub-requests to different micro-services, databases and so on.
asyncdispatch
design allows you to accept seq[FutureBase]
or varargs[FutureBase]
and its enough to check readiness of Future
.
Also extracting results from Future
is extracting exceptions too, with your proposal its unclear how this construction must work if one of Future
will fail, while python's wait
allow you to specify how to react on Exception
.
I don't argue about the low-level details. Nothing about asyncdispatch
should change. The goal is to remove the explicit use of futures from the high level interface, so certain optimizations are easier and the code is safer (please read the linked article).
I don't think there is an issue with exceptions. The semantics of await
are clear - all operations must complete, else an exception will be raised. It's also possible to introduce additional syntax in the API to let you preserve some of the results as futures (e.g. await (wrapErrors request(), bar())
). The sync/async duality makes it very clear how to think about exceptions in all cases - you can always ask yourself "what would the code do in sync mode"?
With select
, again all mandatory operations must complete, otherwise an exception will be thrown. We can also allow except
branches for each request giving you a chance to handle the exception manually. Additional syntax will also be required for implementing progress events.
Sorry but i dont see any optimizations or safe code. Everything i see is proposal to include some more primitives to existing primitives, and add even more to resolve all possible issues.
Also i don't want to think about sync
mode, there no reason to think about it, if people wants to be synchronous why they need asynchronous overhead.
What is proposed await
modification? Its obfuscated version of
await wait(abc, bcd, ALL_COMPLETED)
What is proposed select
? Its obfuscated version of
await wait(abc, bcd, cde, ALL_COMPLETED)
Most of people do not understand asynchronous programming with current API, so why do you think adding more API will help it understand better?
@cheatfate, if it's possible to design the system in a way that avoids heap allocations and reliance on the GC, it would be silly not to take advantage of it.
The specific optimization being discussed here is placing the Future objects within the stack of the caller. At the moment this is still the heap-allocated environment of the closure iterator, but this can be attacked with further optimizations in the compiler (escape analysis for the closure objects, better support for closure/iterator monomorphism in the run-time code). In the end it may be possible to store all the required state within the single value associated with the registered I/O operation. Rust achieves this.
Also, many async procs don't actually need to be compiled as iterators:
proc foo: Future[int] {.async.} =
code code code
...
let v = await bar()
...
return v
is equivalent to the sync proc:
proc foo: int =
code code code
...
let v = waitFor bar()
...
return v
EDIT: The claim above is not correct in general, because if a second
waitFor
loop is entered while the first one is still waiting, it won't be able to awake the original one immediately, thus creating a change in the execution semantics.
I'd rather have an API that lets me experiment more easily with such changes in the compilation strategy.
Do you have any alternative proposal that can ensure all of the above? For the safety, I've given specific examples of problematic code that will be eliminated under the new APIs.
And finally, here is the full equivalent example under your alternative:
var
googlePageFut = httpRequest(&"http://google.com/?q={keyword}")
jsonDataFut = jsonRpcCall("localhost/myApi")
await wait(googlePageFut, jsonDataFut)
let
googlePage = googlePageFut.read
jsonData = jsonDataFut.read
return googlePage.status == 200 and not jsonData.hasKey("error")
Why shouldn't we strive to eliminate this boilerplate?
@zah, i think you missing not something but alot, or maybe somebody misled you with this optimizations. We already talked with @yglukhov, that if variable passes through state, it become part of closure environment, not stack, so all optimizations will not give you so many benefits as you expect.
Also i think you need to read more about rust
implementation tokio
, because i dont want to style like in rust:
read(x).after(read(y).after(read(z)))
@cheatfate, I know precisely how the compiler works, but you are not paying enough attention to what I'm saying:
At the moment this is still the heap-allocated environment of the closure iterator, but this can be attacked with further optimizations in the compiler (escape analysis for the closure objects, better support for closure/iterator monomorphism in the run-time code)
The style of the Rust API is irrelevant, it's the underlying run-time approach that's interesting.
@zah you know precisely how the compiler works, and i know precisely how async works.
The style of Rust API is consequence of using zero-cost abstractions
, almost equal approach is made for javascript's Promise
.
I dont like such approach, because it pretty hard to understand and use.
I haven't read all of the discussion so forgive me if I'm missing something, but I already have a few things I would like to note about the whole reasoning for this issue: "Current Problems".
Consider the following simple async proc:
proc terminateConnection(s: AsyncSocket) {.async.} = var myDisconnectMsg: array[X, byte] prepareDisconnect(myDisconnectMsg) var res = s.send(addr MyDisconnectMsg[], X) # oops, forgot to call await here s.close()
It showcases two critical problems trigged by a simple omission of await in the code:
- The socket will be closed prematurely.
- The send operation will be working with bogus data.
I am against any mixing of pointers and async. If you're doing that then you are on your own and should bear the brunt of the consequences.
Separately I do recognise the problem and it similar to the problem of users not knowing better and using discard
on a future (instead of asyncCheck
). But I think the solution to this is straightforward: destructors. Once a future is out of scope a callback can be set on it to ensure an exception is raised and that it does not get lost. Of course, I'm waiting for destructors to be stable to implement that.
Once that works, the above code should fail with an exception.
I'll read the rest of this as soon as I can as I am open to alternative solutions. But from what skimming this has seemingly revealed, this is suggesting really big changes to fix this one relatively minor issue.
So I read the article as well and I must say that the "nursery" idea proposed can be easily implemented on top of async await. I don't see a reason to restrict async await when these nurseries can be implemented with no breaking changes (and very easily too from what I can tell).
@dom96, what result is expected in example you showed?
- Message was sent. Socket closed.
- Message was not sent. Error about socket closure raised.
- Message was not sent. Socket closed.
(3) will be happen on Unix, but on Windows there is a big chance of (1) and less chance for (3), everything depends on number of messages in sending queue. Also asyncdispatch2 already has procedures which hiding pointer/size usage. While it possible to avoid pointers for writing, its almost impossible to avoid pointers for reading.
And about destructors. We already trained and find ways to cheat/workaround GC. Destructors is almost equal thing, we will need to find ways to workaround/cheat destructors too. The only right way (in my opinion) is to introduce manual memory management with a little help from compiler (for example compiler can calculate max size of single allocated object). With this information you can implement/use more fastest memory managers (like memory pools, please not to be confused with memory regions, because memory pools using constant memory blocks). So i'm not waiting destructors, because this is one more crutch.
Why would (3) be a possibility? Does Windows just ignore a closed socket and report that it sent a message even though it didn't?
@dom96, (1) can be happen when send() -> WSASend() completed immediately, so data was actually sent, but IOCP completion will not be happened, just because socket will be closed. (3) can be happen when send() -> WSASend() not completed immediately, when socket get closed all IOCP information about it will be removed from system queue.
In an offline discussion, I promised to explain how an async accept loop may work under the new scheme. It was pointed out that if the code is written in the style of the old APIs, enforced awaits will lead to accepting and processing only one connection at a time:
while true:
var conn = await socket.accept()
await conn.process() # while this is awaited, we are not accepting new connections
The solve this problem, you must use the new async support enabled in parallel
blocks. In general, the parallel blocks provide semantics that very closely ressemble the style of programming used in the old async APIs with spawn
being roughly similar to asyncCheck
. When you stumble on a problem, you should always ask yourself if the escape hatch provided by parallel blocks in enough to solve it. Here is how the solution here looks like:
parallel:
while true:
var conn = await socket.accept()
spawn conn.process() # this gets added to the list of outstanding async ops
# of the block and the processing continues immediately
The parallel block won't be exited until all outstanding operations are complete. In this sense, it can be argued that the new API provides an additional safety benefit over the ad-hoc processing used in the old async APIs. With the old APIs, a shutdown procedure won't do anything about the outstanding async tasks by default, while here we clearly decide whether we want to wait for all the operations to finish or whether we wan to cancel them by using break
to exit the loop.
Please also note that all the optimization benefits of the new scheme are still realized, because any local variable declared before the parallel or within the parallel block is guaranteed to outlive all the processing of the async operations started in the block and this allows us to pass pointers to such variables to the said async operations (enabling efficient openarray
and var
parameters and allowing us to store the futures and their results on the stack).
Besides using spawn
as in the example above, in parallel blocks you can also directly manipulate the returned future values to add completion callbacks and error handling that will be executed with the familiar semantics of the old APIs (see the the remarks in 4.5 regarding this).
Ignorant, fly-by question: how much of this could be implemented as a package without change to stdlib? I'm interested in using a system like the Python Trio system, because I think (but don't know) that it would simplify some of my async code. Anything I could do to help?
A certain version of this can be implemented without any changes to the compiler. Depending on exactly how much integration with parallel blocks is desired (in other words whether you want to mix async I/O with computational tasks backed by a thread pool), some compiler support can improve the system (and it's more about enforcing safety than about expressiveness). If you want to invest time in this, I can help you with some guidance. I would suggest finding me on Discord (https://discord.gg/XRxWahP) or Gitter.