anyio
anyio copied to clipboard
Added EnhancedTaskGroup, amap() and as_completed()
Changes
Adds an enhanced version of the task group that allows task-by-task cancellation as well as awaiting on the results of individual tasks. Two other convenience functions are also provided:
amap(): calls the given one-parameter coroutine function with each item from the given iterable of arguments and runs them concurrently in a task grouprace()launches all given coroutines as tasks in a task group and returns the return value of whichever task completes first
Concurrency and rate limiting is provided by both functions.
Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that you've fulfilled the following conditions (where applicable):
- [ ] You've added tests (in
tests/) added which would fail without your patch - [ ] You've updated the documentation (in
docs/, in case of behavior changes or new features) - [ ] You've added a new changelog entry (in
docs/versionhistory.rst).
If this is a trivial change, like a typo fix or a code reformatting, then you can ignore these instructions.
Updating the changelog
If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #123, the entry should look like this:
- Fix big bad boo-boo in task groups
(`#123 <https://github.com/agronholm/anyio/issues/123>`_; PR by @yourgithubaccount)
If there's no issue linked, just link to your pull request instead by updating the changelog after you've created the PR.
A bit far-fetched, but what do you think of a free function create_task() that would use the current EnhancedTaskGroup if there is one, and errors out otherwise?
On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.
it removes the need to pass a task group down the stack,
On the other hand, that's the point of passing down the taskgroup -- when it's not there we know that the method we're calling won't start things it doesn't wait for.
If you want to circumvent that, for whatever reason, there's already a fairly-high-performance way to do it -- set a contextvar. So IMHO "explicit is better than implicit" and thus we shouldn't support that natively.
A bit far-fetched, but what do you think of a free function
create_task()that would use the currentEnhancedTaskGroupif there is one, and errors out otherwise? On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.
I'm -1 on an implicit task group.
The one thing I'm after is a nursery/TaskGroup where I can start tasks and then iterate over the results asynchronously as soon as they become available so that I can interleave work and don't have to wait for the slowest task to start the next part of the pipeline.
With this API, race is trivial to implement - you just cancel the TaskGroup as soon as the first task is made available. gather is just collect all results in a list of length n_inputs in the order they were provided then return the results list when the TaskGroup exits.
With this API,
raceis trivial to implement
And how do we deal with exceptions occurring in the child tasks?
With this API,
raceis trivial to implementAnd how do we deal with exceptions occurring in the child tasks?
In my implementation I actually return Outcome instances, so the user processes the Outcome's as they're made available. It's then up to the user to decide what to do with any errors. If they blindly unwrap an error it will raise and cancel the TaskGroup.
It would of course be nice to have this built-in, so I'm lurking here to see if I can replace my own custom solution. For me it's important to be able to process tasks as soon as they're finished and to not have to wait for the slowest.
Ok, so help me understand. What do you suggest race() returns then? What if the first result from a child task is an exception? Do you want to return the Outcome (or equivalent) of that?
I think that depends on the use-case. If amap returned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implement race themselves with the semantics that made sense for their problem.
Given the amap primitive, race becomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.
I think that depends on the use-case. If
amapreturned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implementracethemselves with the semantics that made sense for their problem.Given the
amapprimitive,racebecomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.
But as_completed() does exactly that (with TaskHandles), doesn't it? I don't think amap() is suitable for implementing race().
I think that depends on the use-case. If
amapreturned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implementracethemselves with the semantics that made sense for their problem. Given theamapprimitive,racebecomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.But
as_completed()does exactly that (with TaskHandles), doesn't it? I don't thinkamap()is suitable for implementingrace().
Ah yep, I missed the as_completed implementation - that does look like it does what I'm after!
I'm actually pretty excited about this functionality as it will let me replace a bunch of custom code, so I'm curious if there are plans to land this in a release sometime soonish?
I'm actually pretty excited about this functionality as it will let me replace a bunch of custom code, so I'm curious if there are plans to land this in a release sometime soonish?
I would like to get a fairly broad consensus on the API before moving forward with it. That means getting more people involved.
@Zac-HD I would like your 2 cents on this. Do you think the API would work as-is, or need changes?
Unfortunately there are a couple of parts of the design that make me pretty nervous.
TaskHandleis very much like a future. I worry about dropping captured exceptions if nobody awaits the handle; and in cases where that's certain not to happen I'm unclear on what it adds. Capturing and later raising exceptions also makes tracebacks much less useful, which would be very very painful in large codebases (eg at work)EnhancedTaskGroupcurrently leans heavily onTaskHandle; if it didn't I'd still want to avoid interfaces which accept a coroutine object.amapandas_completedseem like useful helpers, though my instinct would be to implement them directly on top of a TaskGroup + cloneable streams.
TaskHandle is very much like a future. I worry about dropping captured exceptions if nobody awaits the handle; and in cases where that's certain not to happen I'm unclear on what it adds. Capturing and later raising exceptions also
I wonder where the misunderstanding is coming from. Are you thinking that if the task handle isn't saved, the exception is silently dropped? No, EnhancedTaskGroup still enforces SC, just like TaskGroup. But it allows for the return value of a task to be retrieved without awkward user-side wrapping, unlike with normal AnyIO task groups or Trio nurseries. Thus I feel EnhancedTaskGroup combines the best features of AnyIO and AsyncIO task groups.
I know Trio makes a point about not exposing coroutines to users. I'm not across the arguments why, but most things in Trio are very well thought out and very well designed.
In my own implementation I take zero-argument callables, so I force users to use partial. One benefit of this is that I can also take sync functions and run them using anyio.to_thread.run_sync
I know Trio makes a point about not exposing coroutines to users. I'm not across the arguments why, but most things in Trio are very well thought out and very well designed.
In my own implementation I take zero-argument callables, so I force users to use
partial. One benefit of this is that I can also take sync functions and run them usinganyio.to_thread.run_sync
I think it's due to Nursery.start() which due to its design cannot take a coroutine as an argument.
I know Trio makes a point about not exposing coroutines to users. I'm not across the arguments why, but most things in Trio are very well thought out and very well designed.
Also, I have to say that while I am fully behind the SC principles, most things in the Trio code base really rub me the wrong way, so I cannot agree with you.
Be that as it may, if you take executed but not awaited async functions as arguments then it makes it awkward to also support sync functions, if that is something that is wanted.
Be that as it may, if you take executed but not awaited async functions as arguments then it makes it awkward to also support sync functions, if that is something that is wanted.
EnhancedTaskGroup does not and won't support running sync functions.
TaskHandle is very much like a future. I worry about dropping captured exceptions if nobody awaits the handle; and in cases where that's certain not to happen I'm unclear on what it adds. Capturing and later raising exceptions also
I wonder where the misunderstanding is coming from. Are you thinking that if the task handle isn't saved, the exception is silently dropped? No,
EnhancedTaskGroupstill enforces SC, just likeTaskGroup. But it allows for the return value of a task to be retrieved without awkward user-side wrapping, unlike with normal AnyIO task groups or Trio nurseries. Thus I feelEnhancedTaskGroupcombines the best features of AnyIO and AsyncIO task groups.
You're right of course! I'd be more comfortable if TaskHandle didn't capture the task exception though - that'll already be raised from the TaskGroup, and so I'd have a .wait() method return the task return value or raise a ValueError("task did not produce a return value")-ish error. Raising an exception object in two places just feels like a recipe for trouble with the context or traceback attributes (c.f. PEP-785).
TaskHandle is very much like a future. I worry about dropping captured exceptions if nobody awaits the handle; and in cases where that's certain not to happen I'm unclear on what it adds. Capturing and later raising exceptions also
I wonder where the misunderstanding is coming from. Are you thinking that if the task handle isn't saved, the exception is silently dropped? No,
EnhancedTaskGroupstill enforces SC, just likeTaskGroup. But it allows for the return value of a task to be retrieved without awkward user-side wrapping, unlike with normal AnyIO task groups or Trio nurseries. Thus I feelEnhancedTaskGroupcombines the best features of AnyIO and AsyncIO task groups.You're right of course! I'd be more comfortable if
TaskHandledidn't capture the task exception though - that'll already be raised from the TaskGroup, and so I'd have a.wait()method return the task return value or raise aValueError("task did not produce a return value")-ish error. Raising an exception object in two places just feels like a recipe for trouble with the context or traceback attributes (c.f. PEP-785).
~If TaskHandle didn't capture the exception, what would happen if a task that isn't the task group's host task awaits for the task and the awaited task raises an exception?~
I don't think I'd be comfortable with the handle raising ValueError instead of propagating the actual exception raised there (save for asyncio.CancelledError / trio.Cancelled which are converted to TaskCancelled).
TaskHandle is very much like a future. I worry about dropping captured exceptions if nobody awaits the handle; and in cases where that's certain not to happen I'm unclear on what it adds. Capturing and later raising exceptions also
I wonder where the misunderstanding is coming from. Are you thinking that if the task handle isn't saved, the exception is silently dropped? No,
EnhancedTaskGroupstill enforces SC, just likeTaskGroup. But it allows for the return value of a task to be retrieved without awkward user-side wrapping, unlike with normal AnyIO task groups or Trio nurseries. Thus I feelEnhancedTaskGroupcombines the best features of AnyIO and AsyncIO task groups.You're right of course! I'd be more comfortable if
TaskHandledidn't capture the task exception though - that'll already be raised from the TaskGroup, and so I'd have a.wait()method return the task return value or raise aValueError("task did not produce a return value")-ish error. Raising an exception object in two places just feels like a recipe for trouble with the context or traceback attributes (c.f. PEP-785).
I know that asyncio allows this. How do they deal with it?
import asyncio
from asyncio import TaskGroup
async def fail():
await asyncio.sleep(0.1)
raise RuntimeError("foo")
async def main():
async with TaskGroup() as tg:
task = tg.create_task(fail())
await asyncio.sleep(0.05)
await task
asyncio.run(main())
Results in:
+ Exception Group Traceback (most recent call last):
| File "/tmp/a.py", line 11, in <module>
| asyncio.run(main())
| ~~~~~~~~~~~^^^^^^^^
| File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
| return runner.run(main)
| ~~~~~~~~~~^^^^^^
| File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
| File "/usr/lib64/python3.13/asyncio/base_events.py", line 719, in run_until_complete
| return future.result()
| ~~~~~~~~~~~~~^^
| File "/tmp/a.py", line 8, in main
| async with TaskGroup() as tg:
| ~~~~~~~~~^^
| File "/usr/lib64/python3.13/asyncio/taskgroups.py", line 71, in __aexit__
| return await self._aexit(et, exc)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib64/python3.13/asyncio/taskgroups.py", line 173, in _aexit
| raise BaseExceptionGroup(
| ...<2 lines>...
| ) from None
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/tmp/a.py", line 5, in fail
| raise RuntimeError("foo")
| RuntimeError: foo
+------------------------------------
In this AnyIO branch, the exception appears twice, however:
import anyio
async def fail():
await anyio.sleep(0.1)
raise RuntimeError("foo")
async def main():
async with anyio.EnhancedTaskGroup() as tg:
task = tg.create_task(fail())
await anyio.sleep(0.05)
await task
anyio.run(main)
Traceback:
+ Exception Group Traceback (most recent call last):
| File "/tmp/tbtest.py", line 13, in <module>
| anyio.run(main)
| File "/home/alex/workspace/anyio/src/anyio/_core/_eventloop.py", line 74, in run
| return async_backend.run(func, args, {}, backend_options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/alex/workspace/anyio/src/anyio/_backends/_asyncio.py", line 2310, in run
| return runner.run(wrapper())
| ^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib64/python3.12/asyncio/runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/lib64/python3.12/asyncio/base_events.py", line 691, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "/home/alex/workspace/anyio/src/anyio/_backends/_asyncio.py", line 2298, in wrapper
| return await func(*args)
| ^^^^^^^^^^^^^^^^^
| File "/tmp/tbtest.py", line 8, in main
| async with anyio.EnhancedTaskGroup() as tg:
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/alex/workspace/anyio/src/anyio/_core/_tasks.py", line 306, in __aexit__
| return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/alex/workspace/anyio/src/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (2 sub-exceptions)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/tmp/tbtest.py", line 11, in main
| await task
| File "/home/alex/workspace/anyio/src/anyio/_core/_tasks.py", line 280, in __await__
| raise self._exception
| File "/home/alex/workspace/anyio/src/anyio/_core/_tasks.py", line 314, in _run_coro
| retval = await coro
| ^^^^^^^^^^
| File "/tmp/tbtest.py", line 5, in fail
| raise RuntimeError("foo")
| RuntimeError: foo
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "/tmp/tbtest.py", line 11, in main
| await task
| File "/home/alex/workspace/anyio/src/anyio/_core/_tasks.py", line 280, in __await__
| raise self._exception
| File "/home/alex/workspace/anyio/src/anyio/_core/_tasks.py", line 314, in _run_coro
| retval = await coro
| ^^^^^^^^^^
| File "/tmp/tbtest.py", line 5, in fail
| raise RuntimeError("foo")
| RuntimeError: foo
+------------------------------------
On asyncio, the await hits a CancelledError rather than RuntimeError, hence the one-exception group.
This seems similar to the functionality provided by aiometer, which would be nice behavior to have by default. Would some of the other functions built on amap like run_on_each and run_all also add some utility here?
Also, if EnhancedTaskGroup provides the same SC guarantees, what is the reason for implementing this as a separate EnhancedTaskGroup class instead of modifying the existing TaskGroup class? Just backwards compatibility? Which of the features are not opt-in?
This seems similar to the functionality provided by aiometer, which would be nice behavior to have by default. Would some of the other functions built on
amaplikerun_on_eachandrun_allalso add some utility here?Also, if
EnhancedTaskGroupprovides the same SC guarantees, what is the reason for implementing this as a separateEnhancedTaskGroupclass instead of modifying the existingTaskGroupclass? Just backwards compatibility? Which of the features are not opt-in?
I made it a separate class so that I could back out of the API if it was found to be less than optimal. But this isn't set in stone. I would like to coordinate with Trio on this as much as they let me. Ultimately though, I will push these changes with or without their help.
This seems similar to the functionality provided by aiometer, which would be nice behavior to have by default. Would some of the other functions built on
amaplikerun_on_eachandrun_allalso add some utility here? Also, ifEnhancedTaskGroupprovides the same SC guarantees, what is the reason for implementing this as a separateEnhancedTaskGroupclass instead of modifying the existingTaskGroupclass? Just backwards compatibility? Which of the features are not opt-in?I made it a separate class so that I could back out of the API if it was found to be less than optimal. But this isn't set in stone. I would like to coordinate with Trio on this as much as they let me. Ultimately though, I will push these changes with or without their help.
There's another reason: the start() method in the standard task group returns the start value, but here we need to return TaskHandle instead. The handle contains the start value, but this makes for an API incompatibility.
From an API point of view, is it going to be used like so:
async with EnhancedTaskGroup() as tg:
...
Rather than with a function as it's currently the case? I'm thinking that it could be a parameter to create_task_group(enhance=True).