httpx icon indicating copy to clipboard operation
httpx copied to clipboard

Stream response body in ASGITransport

Open jhominal opened this issue 1 year ago • 14 comments
trafficstars

Summary

As part of my job, we needed a variant of ASGITransport that supports streaming (as in #2186), and this is my PR to implement that.

Something that I am particularly proud of is that this PR was written without having to spawn a new task, with the consequence that it avoids issues related to task groups and context variables.

Checklist

  • [x] I understand that this PR may be closed in case there was no previous discussion. (This doesn't apply to typos!)
  • [x] I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • [x] I've updated the documentation accordingly.

Fixes #2186

jhominal avatar Jan 16 '24 00:01 jhominal

The PR was updated, with a new test_asgi_stream_allows_iterative_streaming test, and the added tests do not rely on the app argument in AsyncClient.

jhominal avatar Jan 17 '24 17:01 jhominal

After working with the version of ASGITransport in this PR, it has come to my attention that this version does not work when streaming a starlette.responses.StreamingResponse object. The reason for that being, that starlette.responses.StreamingResponse puts the send calls in a spawned sub task, and that spawned sub task does not go through the generator in _AwaitableRunner.__call__.

I have an idea in order to fix that, here is how I think it could work:

  • Replace response_started bool with an Event object;
  • Replace body_chunks list with a trio object Channel / anyio memory stream ;
  • Replace the _AwaitableRunner.__call__ until Callable argument with an Awaitable ;
  • In _AwaitableRunner.__call__, instead of yielding the item from the generator directly, the yield self._next_item will "race" with the until Awaitable object, using structured concurrency to run that race;

I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.

jhominal avatar Jan 18 '24 18:01 jhominal

I would like your opinion on whether the idea mentioned above should be added to the current PR, or be the object of a new PR.

Either would be okay with me.

lovelydinosaur avatar Jan 23 '24 13:01 lovelydinosaur

Hi @jhominal,

[...] this version does not work when streaming a starlette.responses.StreamingResponse object. The reason for that being, that starlette.responses.StreamingResponse puts the send calls in a spawned sub task [...]

Could you please confirm that your current patch will also not work with sse-starlette, for the same reason, because of this:

            task_group.start_soon(wrap, partial(self.stream_response, send))

?

Do you know about some working alternatives? I have seen https://pypi.org/project/async-asgi-testclient/ and https://gist.github.com/richardhundt/17dfccb5c1e253f798999fc2b2417d7e, not sure what to think about it.

Thanks.

souliane avatar Jun 06 '24 08:06 souliane

Hello @souliane

I had been working on and off on this issue for a while, in short:

  1. I had hoped that I would be able to avoid spawning a new task for running the ASGI app, because:
  • a new task means that context variables set in the app cannot be seen by the caller;
  • under structured concurrency I have to think about when and how the task group is shutdown (with the breakages that happen when the starting task is not the same as the ending task);
  1. I was able to avoid spawning a new task when running on asyncio (which is my production environment), but not when running on trio (anything I can imagine writing seems to have race conditions due to violating what I understand of trio invariants);
  2. I think I should just bite that bullet and spawn a task for running the ASGI app, with an option so that people can choose whether to spawn a task (and benefit from streaming response) or not;

jhominal avatar Jun 06 '24 09:06 jhominal