cpython icon indicating copy to clipboard operation
cpython copied to clipboard

gh-96471: Add asyncio queue shutdown

Open EpicWink opened this issue 2 years ago • 13 comments

asyncio-only changes from #102499 (which supercedes #96474), updated to match the API introduced by #104750

  • Issue: gh-96471

:books: Documentation preview :books:: https://cpython-previews--104228.org.readthedocs.build/

EpicWink avatar May 06 '23 04:05 EpicWink

@willingc ready for review

EpicWink avatar May 06 '23 04:05 EpicWink

asyncio-only changes from #102499 (on top of #96474)

I assume the "on top of ..." part is obsolete, since that PR was closed without merging. I'll add @willingc as a reviewer per your request.

gvanrossum avatar May 09 '23 17:05 gvanrossum

@EpicWink Could you merge main?

gvanrossum avatar Feb 10 '24 05:02 gvanrossum

Could you merge main?

Done, but like #104750, we may want to modify the implementation of immediate=True to simply consume the queue rather than having a separate state.

EpicWink avatar Feb 20 '24 11:02 EpicWink

Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.)

gvanrossum avatar Feb 20 '24 15:02 gvanrossum

Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it?

I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here: https://github.com/python/cpython/compare/main...EpicWink:cpython:fix-thread-queue-shutdown-test)

EpicWink avatar Feb 20 '24 23:02 EpicWink

Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-)

gvanrossum avatar Feb 21 '24 00:02 gvanrossum

Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750.)

gvanrossum avatar Feb 22 '24 00:02 gvanrossum

Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.

Laurie


From: Guido van Rossum @.> Sent: Thursday, February 22, 2024 11:00:09 AM To: python/cpython @.> Cc: Laurie O @.>; Mention @.> Subject: Re: [python/cpython] gh-96471: Add asyncio queue shutdown (PR #104228)

Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750https://github.com/python/cpython/pull/104750.)

— Reply to this email directly, view it on GitHubhttps://github.com/python/cpython/pull/104228#issuecomment-1958472784, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AF72GRNKMPJRCRIJHOB4NUDYU2KBTAVCNFSM6AAAAAAXX3PWIGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNJYGQ3TENZYGQ. You are receiving this because you were mentioned.Message ID: @.***>

EpicWink avatar Feb 22 '24 01:02 EpicWink

Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.

And what about queue.Queue.join()? Should it raise after shutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)

gvanrossum avatar Feb 22 '24 01:02 gvanrossum

And what about queue.Queue.join()? Should it raise after shutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)

The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues

EpicWink avatar Feb 22 '24 03:02 EpicWink

By the way, should task_done() raise an exception after shutdown or not ? update; see https://docs.python.org/3.13/library/queue.html#queue.Queue.task_done

YvesDup avatar Feb 22 '24 09:02 YvesDup

By the way, should task_done() raise an exception after shutdown or not ?

@YvesDup No (except the usual ValueError); see #115838

EpicWink avatar Feb 25 '24 12:02 EpicWink

I've removed @YvesDup's tests test_shutdown_all_methods_in_one_task, test_shutdown_putters_deque, test_shutdown_getters_deque, test_shutdown_get, test_shutdown_get_task_done_join, and test_shutdown_put, and replaced them with simpler and lower-level tests. Are there any scenarios I've missed?

I agree with all the removes. May i suggest creating a test that checks _getters, _putters and _unfinished_tasks private attributes when queue shutdowns, immediatly or not ? Or adds them in the existing tests ?

YvesDup avatar Mar 22 '24 13:03 YvesDup

All tests run good. But, I am wondering whether it's not easier to understand test aims when they are organized by features like:

  • test_shutdown_get[_task_done],
  • test_shutdown_put,
  • test_shutdown_join,
  • etc ...

With your actual organization (test_shutdown_empty, test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.

YvesDup avatar Mar 26 '24 18:03 YvesDup

With your actual organization (test_shutdown_empty, test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.

Each test is a different scenario, and I'm checking (hopefully) the full externally-visible state and functionality for each scenario. I think I will add more comments in these tests.

EpicWink avatar Mar 27 '24 01:03 EpicWink

With your actual organization (test_shutdown_empty, test_shutdown_nonempty), its seems to me harder to follow the test purposes. It'is up to you.

Each test is a different scenario, and I'm checking (hopefully) the full externally-visible state and functionality for each scenario. I think I will add more comments in these tests.

Thank for all the comments, this is really easier to understand. IMO an important use case is missing: the context is when there are an empty queue and a pending get.

  • when shutdown is not immediate, the task still stay in the q._gettersand we have to cancel it.
  • when shutdown is immediate, this task is removed in the shutdownmethod.

It could be based on following code:

async def _test_shutdown_pending_get(immediate_=False):
    async def _get(q, res):
        try:
            r = await q.get()
            res.append(r)
        except asyncio.queues.QueueShutDown:
            res.append(True)
        except asyncio.CancelledError:
            res.append(False)

    res = []
    q = asyncio.Queue(1)
    loop = asyncio.get_running_loop()
    get_task = loop.create_task(_get(q, res))
    await _ensure_started(get_task)

    q.shutdown(immediate=immediate_)
    if not immediate_:
        get_task.cancel()

    await get_task
    assert(res[0] == immediate_)

YvesDup avatar Mar 27 '24 15:03 YvesDup

Hey guys, great progress here. Can you at-mention me on this PR when you're ready for review? I recommend merging main just before you do that (don't rebase please).

gvanrossum avatar Mar 27 '24 17:03 gvanrossum

IMO an important use case is missing: the context is when there are an empty queue and a pending get.

* when shutdown is not immediate, the task still stay in the `q._getters`and we have to cancel it.

* when shutdown is immediate, this task is removed in the `shutdown`method.

@YvesDup No, in both cases the task should raise ShutDown: when the queue is empty, there should be no distinction between immediate and non-immediate shutdown (ignoring unfinished-tasks). ~I'm fixing~ I have fixed this bug and updating the test.


The failing multi-processing WithProcessesTestBarrier tests in the Windows 64-bit free-threading builds seem unrelated

EpicWink avatar Mar 28 '24 04:03 EpicWink

@gvanrossum ready for review

EpicWink avatar Mar 28 '24 06:03 EpicWink

in both cases the task should raise ShutDown: when the queue is empty, there should be no distinction between immediate and non-immediate shutdown (ignoring unfinished-tasks).

@EpicWink , there is the same bug on threading queue shutdown. I will open an issue.

YvesDup avatar Apr 03 '24 17:04 YvesDup

:robot: New build scheduled with the buildbot fleet by @gvanrossum for commit 2fa1bd9aa04e286a36cb321411e1bfff32fc2b81 :robot:

If you want to schedule another build, you need to add the :hammer: test-with-buildbots label again.

bedevere-bot avatar Apr 03 '24 23:04 bedevere-bot

ALso, why is the DO-NOT-MERGE label set? Is it because of this?

EIT: But that is fixed (I actually stumbled over this, wondering why you always wake up all getters -- and came to the conclusion that it is needed).

gvanrossum avatar Apr 03 '24 23:04 gvanrossum

I want to trust that you two have done the right thing there

@gvanrossum please don't trust, have you seen xz-utils's backdoor?

but not push until the big test run completes? (Buildbot tests can take many hours, sorry.)

Oops, sorry. I forgot to hold off my most recent push

ALso, why is the DO-NOT-MERGE label set? Is it because of https://github.com/python/cpython/pull/104228/#issuecomment-2035201238?

No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

EpicWink avatar Apr 04 '24 01:04 EpicWink

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue #117531, and fix in PR #117532

EpicWink avatar Apr 04 '24 02:04 EpicWink

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue #117531, and fix in PR #117532

I have seen and commented

YvesDup avatar Apr 04 '24 06:04 YvesDup

I think it's good to go

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

I could do this, but I don't think it's worth revoking the PR's approval

EpicWink avatar Apr 06 '24 06:04 EpicWink

Sorry for merging prematurely. @EpicWink Can you prepare a follow-up PR? I will merge it as soon as @YvesDup approves it. (Or, alternatively, @YvesDup can make a PR and I will merge when @EpicWink approves.)

gvanrossum avatar Apr 06 '24 18:04 gvanrossum

@gvanrossum see #117621

EpicWink avatar Apr 08 '24 05:04 EpicWink

I'd like to add that the shutdown() method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:

async def async_map[T, R](
    func: Callable[[T], Awaitable[R]],
    iterable: AsyncIterable[T],
    *,
    limit: int,
    maxsize: int = -1,
) -> AsyncIterator[R]:
    if maxsize < 0:
        maxsize = limit

    arguments_queue = Queue[T](maxsize=maxsize)
    results_queue = Queue[R](maxsize=maxsize)

    async def drain():
        async for argument in iterable:
            await arguments_queue.put(argument)
        arguments_queue.shutdown(immediate=False)

    async def worker():
        while True:
            try:
                argument = await arguments_queue.get()
            except QueueShutDown:
                break
            await results_queue.put(await func(argument))

    async def background():
        async with asyncio.TaskGroup() as background_task_group:
            background_task_group.create_task(drain())
            for _ in range(limit):
                background_task_group.create_task(worker())
        results_queue.shutdown(immediate=False)
 
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(background())

        while True:
            try:
                yield await results_queue.get()
            except QueueShutDown:
                break

I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.

I know there's some backports. Is there a backports package for asyncio stuff already?

john-parton avatar Aug 27 '24 04:08 john-parton