cpython
cpython copied to clipboard
asyncio.all_tasks() crashes if asyncio is used in multiple threads
| BPO | 36607 |
|---|---|
| Nosy | @asvetlov, @thatch, @ambv, @1st1, @miss-islington, @sdunster |
| PRs |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
assignee = None
closed_at = None
created_at = <Date 2019-04-11.18:12:48.096>
labels = ['3.8', 'type-bug', '3.7', '3.9', 'expert-asyncio']
title = 'asyncio.all_tasks() crashes if asyncio is used in multiple threads'
updated_at = <Date 2019-06-12.17:26:52.568>
user = 'https://bugs.python.org/NickDavies'
bugs.python.org fields:
activity = <Date 2019-06-12.17:26:52.568>
actor = 'asvetlov'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['asyncio']
creation = <Date 2019-04-11.18:12:48.096>
creator = 'Nick Davies'
dependencies = []
files = []
hgrepos = []
issue_num = 36607
keywords = ['patch']
message_count = 11.0
messages = ['339991', '340424', '340425', '340428', '340527', '340544', '345239', '345247', '345266', '345380', '345389']
nosy_count = 7.0
nosy_names = ['asvetlov', 'thatch', 'lukasz.langa', 'yselivanov', 'miss-islington', 'sdunster', 'Nick Davies']
pr_nums = ['13971', '13975', '13976']
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = 'behavior'
url = 'https://bugs.python.org/issue36607'
versions = ['Python 3.7', 'Python 3.8', 'Python 3.9']
This problem was identified in https://bugs.python.org/issue34970 but I think the fix might have been incorrect. The theory in bpo-34970 was that GC was causing the weakrefset for all_tasks to change during iteration. However Weakset provides an _IterationGuard class to prevent GC from changing the set during iteration and hence preventing this problem in a single thread.
My thoughts on this problem are:
asyncio.tasks._all_tasksis shared for all users of asyncio (https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L818)- Any new Task constructed mutates
_all_tasks(https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L117) - _IterationGuard won't protect iterations in this case because calls to Weakset.add will always commit changes even if there is something iterating (https://github.com/python/cpython/blob/3.6/Lib/_weakrefset.py#L83)
- calls to
asyncio.all_tasksorasyncio.tasks.Task.all_taskscrash if any task is started on any thread during iteration.
Repro code:
import asyncio
from threading import Thread
async def do_nothing():
await asyncio.sleep(0)
async def loop_tasks():
loop = asyncio.get_event_loop()
while True:
loop.create_task(do_nothing())
await asyncio.sleep(0.01)
def old_thread():
loop = asyncio.new_event_loop()
while True:
asyncio.tasks.Task.all_tasks(loop=loop)
def new_thread():
loop = asyncio.new_event_loop()
while True:
asyncio.all_tasks(loop=loop)
old_t = Thread(target=old_thread)
new_t = Thread(target=new_thread)
old_t.start()
new_t.start()
asyncio.run(loop_tasks())
Output:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "tmp/test_asyncio.py", line 25, in new_thread
asyncio.all_tasks(loop=loop)
File "/usr/lib/python3.7/asyncio/tasks.py", line 40, in all_tasks
return {t for t in list(_all_tasks)
File "/usr/lib/python3.7/_weakrefset.py", line 60, in __iter__
for itemref in self.data:
RuntimeError: Set changed size during iteration
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "tmp/test_asyncio.py", line 19, in old_thread
asyncio.tasks.Task.all_tasks(loop=loop)
File "/usr/lib/python3.7/asyncio/tasks.py", line 52, in _all_tasks_compat
return {t for t in list(_all_tasks) if futures._get_loop(t) is loop}
File "/usr/lib/python3.7/_weakrefset.py", line 60, in __iter__
for itemref in self.data:
RuntimeError: Set changed size during iteration
Thanks for the report!
I see 3 ways to fix the bug:
- Guard _all_tasks with threading.Lock. It hurts performance significantly.
- Retry list(_all_tasks) call in a loop if RuntimeError was raised. A chance of collision is very low, the strategy is good enough
- Change _all_tasks from weak set of tasks to WeakDict[AbstractEventLoop, WeakSet[Task]]. This realization eliminates the collision it is a little complicated. Plus loop should be hashable now (perhaps ok but I'm not sure if all third-party loops support it).
Thus I'm inclining to bullet 2. THoughts?
The fix can be applied to 3.7 and 3.8 only, sorry. Python 3.6 is in security mode now.
My preference would actually be number 3 because:
1: I agree that this isn't really a safe option because it could slow things down (possibly a lot) 2: I haven't found this to be rare in my situation but I am not sure how common my setup is. We have a threaded server with a mix of sync and asyncio so we use run in a bunch of places inside threads. Any time the server gets busy any task creation that occurs during the return of run crashes. My two main reservations about this approach are: - There is a potentially unbounded number of times that this could need to retry. - Also this is covering up a thread unsafe operation and we are pretty lucky based on the current implementation that it explodes in a consistent and sane way that we can catch and retry. 3: Loop is already expected to be hashable in 3.7 as far as I can tell (https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L822) so other than the slightly higher complexity this feels like the cleanest solution.
The fix can be applied to 3.7 and 3.8 only, sorry. Python 3.6 is in security mode now.
Thats fine, you can work around the issue using asyncio.set_task_factory to something that tracks the tasks per loop with some care.
Sorry, I've missed that the loop has hashable requirement already. Would you prepare a patch for number 3? I am afraid we can add another hard-to-debug multi-threaded problem by complicating the data structure.
I'm just curious why do you call all_tasks() at all?
In my mind, the only non-debug usage is asyncio.run()
Would you prepare a patch for number 3?
I will give it a try and see what I come up with.
I am afraid we can add another hard-to-debug multi-threaded problem by complicating the data structure.
Yeah this was my concern too, the adding and removing from the WeakDict[AbstractEventLoop, WeakSet[Task]] for _all_tasks could still cause issues. Specifically the whole WeakSet class is not threadsafe so I would assume WeakDict is the same, there may not be a nice way of ensuring a combination of GC + the IterationGuard doesn't come and mess up the dict even if I wrap it in a threading lock.
Another option would be to have the WeakSet[Task] attached to the loop itself then because using the same loop in multiple threads not at all thread safe already that would contain the problem. You mentioned "third-party loops" which may make this option impossible.
I'm just curious why do you call
all_tasks()at all? In my mind, the only non-debug usage isasyncio.run()
In reality we aren't using all_tasks() directly. We are calling asyncio.run() from multiple threads which triggers the issue. The repro I provided was just a more reliable way of triggering the issue. I will paste a slightly more real-world example of how this happened below. This version is a little more messy and harder to see exactly what the problem is which is why I started with the other one.
import asyncio
from threading import Thread
async def do_nothing(n=0):
await asyncio.sleep(n)
async def loop_tasks():
loop = asyncio.get_event_loop()
while True:
loop.create_task(do_nothing())
await asyncio.sleep(0.01)
async def make_tasks(n):
loop = asyncio.get_event_loop()
for i in range(n):
loop.create_task(do_nothing(1))
await asyncio.sleep(1)
def make_lots_of_tasks():
while True:
asyncio.run(make_tasks(10000))
for i in range(10):
t = Thread(target=make_lots_of_tasks)
t.start()
asyncio.run(loop_tasks())
New changeset 65aa64fae89a24491aae84ba0329eb8f3c68c389 by Miss Islington (bot) (Andrew Svetlov) in branch 'master': bpo-36607: Eliminate RuntimeError raised by asyncio.all_tasks() (GH-13971) https://github.com/python/cpython/commit/65aa64fae89a24491aae84ba0329eb8f3c68c389
New changeset 5d1d4e3179ffd4d2d72462d870cf86dcc11450ce by Miss Islington (bot) in branch '3.7': bpo-36607: Eliminate RuntimeError raised by asyncio.all_tasks() (GH-13971) https://github.com/python/cpython/commit/5d1d4e3179ffd4d2d72462d870cf86dcc11450ce
New changeset 83abd9658b4845299452be06c9ce92cbceee944d by Miss Islington (bot) in branch '3.8': bpo-36607: Eliminate RuntimeError raised by asyncio.all_tasks() (GH-13971) https://github.com/python/cpython/commit/83abd9658b4845299452be06c9ce92cbceee944d
Note: there's a discussion on GitHub on PR 13971 being a bad solution: https://github.com/python/cpython/pull/13971#issuecomment-500908198
They should be reverted (or fixed forward) for 3.8 and 3.9. I'm OK with this modifying the AbstractEventLoop API if need be for 3.8. Just have a PR ready in the next two weeks.
Lukasz, please don't rush. Applied PR makes asyncio more stable, not worse.
I see the only way to make it perfect: get rid of weak refs entirely. Otherwise there is always a chance to catch GC run and set element deletion on iteration over the set. Sorry, that's how weakrefs work.
There is such possibility: call asyncio._unregister_task() explicitly when the task is done (finished with success or failure or cancelled). By this, we can replace weakset with a regular set.
The only requirement is that task should call this _unregister_task() method. No public API change is needed.
At the time of work on 3.7, Yuri and I considered this implementation but rejected it because there was a (very low) chance that somebody may implement own task, register custom task factory and don't call _unregister_task().
I never see a code that implements asyncio task from scratch, people always reuse existing asyncio.Task.
So, maybe the idea is not such bad. It can be implemented easily. I'm volunteered to make a PR with the proposal demonstration in a day or two, depending on my free time.
I would have rather made all_tasks a per loop set of tasks rather than global so there would be no race in the first place. @gvanrossum What do you think? The change will be small and would make code more reliable.
I don't remember why it was done this way. Let me see if @1st1 remembers. I betcha there's a deep reason.
I'd be +1 for all_tasks being something on loops. That seems like a clearer/more obvious implementation to me.
In https://github.com/python/cpython/issues/91887 @bdarnell @bast0006 did some good code archaeology on all_tasks, and it sounds like the implementation may have been the way it is to help with debugging "lost" tasks. If this is correct, I would imagine keeping things separate from loop instances made this easier if the loop references were also "lost"/closed at the point of debugging.
Further, it sounds like some of the original API to facilitate this didn't survive, like the ability to get "done" tasks and I hope most of the low-level debugging is now complete. So, perhaps now is a good time to change the implementation to be more obvious.
As a next action, this would be a good issue to discuss in detail at the upcoming core dev sprint.