tornado
tornado copied to clipboard
Feature request: Implement `body_producer` for `curl_httpclient`
This line seems implies the body is always a text, which is not always true: https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L424
Can we unify the API, e.g. wrap body_producer
to pycurl.READFUNCTION
?
The utf8
function does not require that its input be text. It accepts either text strings or byte strings, and if it is a text string, uses utf8 to convert it to bytes. So you can simply pass bytes directly here (and most places in tornado).
I should clarify my intention: I need to "stream" a large body (by PUT method) with CurlAsyncHTTPClient
, since body_producer
somehow does not support in CurlAsyncHTTPClient
, I have to use prepare_curl_callback
to register a read function.
In this case I have to set allow_nonstandard_methods
, and it's not enough, because pycurl.READFUNCTION
will be overwrite anyway:
https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L429
So I'm not able to use CurlAsyncHTTPClient
to stream body with PUT.
I see. I've retitled the issue to reflect that.
Hi @bdarnell, curious if you have any plans about addressing this limitation.
We have been long time users of the simple_httpclient
, but its performance and lack of Keep-Alive
support have became an issue. We started migrating to curl_httpclient
but quickly realized that support for request streaming is missing...
We're interested in making this work, so may be able to help with some guidance from you.
Here are some thoughts on how this could be approached:
- The
pycurl.READFUNCTION
currently reads from a static buffer - In case of a
body_producer
we might exhaust the data available to be sent before producer generates more, so we want to somehow suspend the request until more data arrives - We cannot
await
forbody_generator
fromREADFUNCTION
of course, becauselibcurl
is on the call stack -
libcurl
luckily provides a special CURL_READFUNC_PAUSE return code that will pause the handler until it's un-paused explicitly - My suggestion is to spawn an additional async task that:
- fetches data from
body_generator
until its exhausted one chunk at a time - puts new chunk into
tornado.queues.Queue(maxsize=1)
- un-pauses the
Curl
handler if it's currently paused
- fetches data from
- On the
READFUNCTION
side:- function will read available data from the intermediate
buffer
- once buffer is empty it will do
queue.get_nowait()
to see if more is available - if yes - return N bytes to curl and store the rest int the
buffer
- if no - pause the handler by returning
pycurl.READFUNC_PAUSE
- function will read available data from the intermediate
- Using a queue in addition to the
buffer
ensures that we won't OOM ifbody_generator
produces data much faster than curl is able to send. Similar kind of backpressure could be useful in current streaming WRITEFUNCTION as well actually.
Let me know what you think!
Hi @sergiimk ,
Thanks for the interest in this feature and the detailed writeup. This approach sounds good to me.
One alternative you may want to consider is using a tornado.iostream._StreamBuffer
combined with a tornado.locks.BoundedSemaphore
(or maybe a Condition?) instead of a buffer (memoryview?) and a Queue. The Semaphore would let you limit the memory used in bytes, instead of in number of chunks. And the _StreamBuffer
class is carefully optimized to minimize copies while also collecting small writes into larger batches (How much this matters depends on how the body_producer works - if it produces, say, 4KB at a time consistently, then the queue is fine. If it sometimes produces small chunks and sometimes larger ones, the _StreamBuffer
would be more beneficial).
I gave this a shot today, but it didn't quite work out.
Half way through the implementation I realized that the streamed requests take a lot longer to complete than expected. I traced the problem to curl.pause(pycurl.PAUSE_CONT)
call not actually "waking up" the connection in libcurl
- the library doesn't schedule a timer callback or any socket actions after un-pause.
The only reason request was succeeding at all is thanks to the existing force_timeout hack that calls multi.socket_all()
every second forcing libcurl
to re-check the state of all connections.
This bug is called out in the docs (see "USAGE WITH THE MULTI-SOCKET INTERFACE") as fixed, but I reproduced it even on 7.72.0
. The libcurl's changelog has lots of attempts to fix pause/unpause so looks like it's a long-dragging issue...
I tried several workarounds (e.g. scheduling the timeout for libcurl
myself) but they all behaved differently under my host's libcurl
version and the version we use in our deployments - so it seemed extremely brittle. The only reliable solution was to call multi.socket_all()
immediately after un-pausing the connection, but docs say it's a performance killer - so it's a no-go for our highly-concurrent scenario when streaming lots of small chunks.
Leaving this info for posterity. A repro case and an issue with curl
would be the proper next step, but given all the problems we already have with distributing pycurl
I think I'll take a step back for now and re-assess the options.