cpython
cpython copied to clipboard
gh-96471: Add asyncio queue shutdown
asyncio-only changes from #102499 (which supercedes #96474), updated to match the API introduced by #104750
- Issue: gh-96471
:books: Documentation preview :books:: https://cpython-previews--104228.org.readthedocs.build/
@willingc ready for review
asyncio-only changes from #102499 (on top of #96474)
I assume the "on top of ..." part is obsolete, since that PR was closed without merging. I'll add @willingc as a reviewer per your request.
@EpicWink Could you merge main?
Could you merge main?
Done, but like #104750, we may want to modify the implementation of immediate=True to simply consume the queue rather than having a separate state.
Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.)
Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it?
I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here: https://github.com/python/cpython/compare/main...EpicWink:cpython:fix-thread-queue-shutdown-test)
Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-)
Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750.)
Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.
Laurie
From: Guido van Rossum @.> Sent: Thursday, February 22, 2024 11:00:09 AM To: python/cpython @.> Cc: Laurie O @.>; Mention @.> Subject: Re: [python/cpython] gh-96471: Add asyncio queue shutdown (PR #104228)
Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750https://github.com/python/cpython/pull/104750.)
— Reply to this email directly, view it on GitHubhttps://github.com/python/cpython/pull/104228#issuecomment-1958472784, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AF72GRNKMPJRCRIJHOB4NUDYU2KBTAVCNFSM6AAAAAAXX3PWIGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNJYGQ3TENZYGQ. You are receiving this because you were mentioned.Message ID: @.***>
Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.
And what about queue.Queue.join()? Should it raise after shutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)
And what about
queue.Queue.join()? Should it raise aftershutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)
The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues
By the way, should task_done() raise an exception after shutdown or not ?
update; see https://docs.python.org/3.13/library/queue.html#queue.Queue.task_done
By the way, should
task_done()raise an exception aftershutdownor not ?
@YvesDup No (except the usual ValueError); see #115838
I've removed @YvesDup's tests
test_shutdown_all_methods_in_one_task,test_shutdown_putters_deque,test_shutdown_getters_deque,test_shutdown_get,test_shutdown_get_task_done_join, andtest_shutdown_put, and replaced them with simpler and lower-level tests. Are there any scenarios I've missed?
I agree with all the removes.
May i suggest creating a test that checks _getters, _putters and _unfinished_tasks private attributes when queue shutdowns, immediatly or not ? Or adds them in the existing tests ?
All tests run good. But, I am wondering whether it's not easier to understand test aims when they are organized by features like:
test_shutdown_get[_task_done],test_shutdown_put,test_shutdown_join,- etc ...
With your actual organization (test_shutdown_empty, test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.
With your actual organization (
test_shutdown_empty,test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.
Each test is a different scenario, and I'm checking (hopefully) the full externally-visible state and functionality for each scenario. I think I will add more comments in these tests.
With your actual organization (
test_shutdown_empty,test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.Each test is a different scenario, and I'm checking (hopefully) the full externally-visible state and functionality for each scenario. I think I will add more comments in these tests.
Thank for all the comments, this is really easier to understand. IMO an important use case is missing: the context is when there are an empty queue and a pending get.
- when shutdown is not immediate, the task still stay in the
q._gettersand we have to cancel it. - when shutdown is immediate, this task is removed in the
shutdownmethod.
It could be based on following code:
async def _test_shutdown_pending_get(immediate_=False):
async def _get(q, res):
try:
r = await q.get()
res.append(r)
except asyncio.queues.QueueShutDown:
res.append(True)
except asyncio.CancelledError:
res.append(False)
res = []
q = asyncio.Queue(1)
loop = asyncio.get_running_loop()
get_task = loop.create_task(_get(q, res))
await _ensure_started(get_task)
q.shutdown(immediate=immediate_)
if not immediate_:
get_task.cancel()
await get_task
assert(res[0] == immediate_)
Hey guys, great progress here. Can you at-mention me on this PR when you're ready for review? I recommend merging main just before you do that (don't rebase please).
IMO an important use case is missing: the context is when there are an empty queue and a pending get.
* when shutdown is not immediate, the task still stay in the `q._getters`and we have to cancel it. * when shutdown is immediate, this task is removed in the `shutdown`method.
@YvesDup No, in both cases the task should raise ShutDown: when the queue is empty, there should be no distinction between immediate and non-immediate shutdown (ignoring unfinished-tasks). ~I'm fixing~ I have fixed this bug and updating the test.
The failing multi-processing WithProcessesTestBarrier tests in the Windows 64-bit free-threading builds seem unrelated
@gvanrossum ready for review
in both cases the task should raise
ShutDown: when the queue is empty, there should be no distinction between immediate and non-immediate shutdown (ignoring unfinished-tasks).
@EpicWink , there is the same bug on threading queue shutdown. I will open an issue.
:robot: New build scheduled with the buildbot fleet by @gvanrossum for commit 2fa1bd9aa04e286a36cb321411e1bfff32fc2b81 :robot:
If you want to schedule another build, you need to add the :hammer: test-with-buildbots label again.
ALso, why is the DO-NOT-MERGE label set? Is it because of this?
EIT: But that is fixed (I actually stumbled over this, wondering why you always wake up all getters -- and came to the conclusion that it is needed).
I want to trust that you two have done the right thing there
@gvanrossum please don't trust, have you seen xz-utils's backdoor?
but not push until the big test run completes? (Buildbot tests can take many hours, sorry.)
Oops, sorry. I forgot to hold off my most recent push
ALso, why is the DO-NOT-MERGE label set? Is it because of https://github.com/python/cpython/pull/104228/#issuecomment-2035201238?
No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).
wondering why you always wake up all getters -- and came to the conclusion that it is needed
I should probably add a comment
there is the same bug on threading queue shutdown. I will open an issue.
@YvesDup I've created issue #117531, and fix in PR #117532
there is the same bug on threading queue shutdown. I will open an issue.
@YvesDup I've created issue #117531, and fix in PR #117532
I have seen and commented
I think it's good to go
wondering why you always wake up all getters -- and came to the conclusion that it is needed
I should probably add a comment
I could do this, but I don't think it's worth revoking the PR's approval
Sorry for merging prematurely. @EpicWink Can you prepare a follow-up PR? I will merge it as soon as @YvesDup approves it. (Or, alternatively, @YvesDup can make a PR and I will merge when @EpicWink approves.)
@gvanrossum see #117621
I'd like to add that the shutdown() method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:
async def async_map[T, R](
func: Callable[[T], Awaitable[R]],
iterable: AsyncIterable[T],
*,
limit: int,
maxsize: int = -1,
) -> AsyncIterator[R]:
if maxsize < 0:
maxsize = limit
arguments_queue = Queue[T](maxsize=maxsize)
results_queue = Queue[R](maxsize=maxsize)
async def drain():
async for argument in iterable:
await arguments_queue.put(argument)
arguments_queue.shutdown(immediate=False)
async def worker():
while True:
try:
argument = await arguments_queue.get()
except QueueShutDown:
break
await results_queue.put(await func(argument))
async def background():
async with asyncio.TaskGroup() as background_task_group:
background_task_group.create_task(drain())
for _ in range(limit):
background_task_group.create_task(worker())
results_queue.shutdown(immediate=False)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(background())
while True:
try:
yield await results_queue.get()
except QueueShutDown:
break
I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.
I know there's some backports. Is there a backports package for asyncio stuff already?