httpx
httpx copied to clipboard
Stream response body in ASGITransport
Summary
As part of my job, we needed a variant of ASGITransport that supports streaming (as in #2186), and this is my PR to implement that.
Something that I am particularly proud of is that this PR was written without having to spawn a new task, with the consequence that it avoids issues related to task groups and context variables.
Checklist
- [x] I understand that this PR may be closed in case there was no previous discussion. (This doesn't apply to typos!)
- [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
- [x] I've updated the documentation accordingly.
Fixes #2186
The PR was updated, with a new test_asgi_stream_allows_iterative_streaming test, and the added tests do not rely on the app argument in AsyncClient.
After working with the version of ASGITransport in this PR, it has come to my attention that this version does not work when streaming a starlette.responses.StreamingResponse object. The reason for that being, that starlette.responses.StreamingResponse puts the send calls in a spawned sub task, and that spawned sub task does not go through the generator in _AwaitableRunner.__call__.
I have an idea in order to fix that, here is how I think it could work:
- Replace
response_startedboolwith anEventobject; - Replace
body_chunkslistwith a trio object Channel / anyio memory stream ; - Replace the
_AwaitableRunner.__call__untilCallableargument with anAwaitable; - In
_AwaitableRunner.__call__, instead of yielding the item from the generator directly, theyield self._next_itemwill "race" with theuntilAwaitableobject, using structured concurrency to run that race;
I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.
I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.
Either would be okay with me.
Hi @jhominal,
[...] this version does not work when streaming a
starlette.responses.StreamingResponseobject. The reason for that being, thatstarlette.responses.StreamingResponseputs thesendcalls in a spawned sub task [...]
Could you please confirm that your current patch will also not work with sse-starlette, for the same reason, because of this:
task_group.start_soon(wrap, partial(self.stream_response, send))
?
Do you know about some working alternatives? I have seen https://pypi.org/project/async-asgi-testclient/ and https://gist.github.com/richardhundt/17dfccb5c1e253f798999fc2b2417d7e, not sure what to think about it.
Thanks.
Hello @souliane
I had been working on and off on this issue for a while, in short:
- I had hoped that I would be able to avoid spawning a new task for running the ASGI app, because:
- a new task means that context variables set in the app cannot be seen by the caller;
- under structured concurrency I have to think about when and how the task group is shutdown (with the breakages that happen when the starting task is not the same as the ending task);
- I was able to avoid spawning a new task when running on
asyncio(which is my production environment), but not when running ontrio(anything I can imagine writing seems to have race conditions due to violating what I understand oftrioinvariants); - I think I should just bite that bullet and spawn a task for running the ASGI app, with an option so that people can choose whether to spawn a task (and benefit from streaming response) or not;