Feature request: Implement `body_producer` for `curl_httpclient`
This line seems implies the body is always a text, which is not always true: https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L424
Can we unify the API, e.g. wrap body_producer to pycurl.READFUNCTION?
The utf8 function does not require that its input be text. It accepts either text strings or byte strings, and if it is a text string, uses utf8 to convert it to bytes. So you can simply pass bytes directly here (and most places in tornado).
I should clarify my intention: I need to "stream" a large body (by PUT method) with CurlAsyncHTTPClient, since body_producer somehow does not support in CurlAsyncHTTPClient, I have to use prepare_curl_callback to register a read function.
In this case I have to set allow_nonstandard_methods, and it's not enough, because pycurl.READFUNCTION will be overwrite anyway:
https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L429
So I'm not able to use CurlAsyncHTTPClient to stream body with PUT.
I see. I've retitled the issue to reflect that.
Hi @bdarnell, curious if you have any plans about addressing this limitation.
We have been long time users of the simple_httpclient, but its performance and lack of Keep-Alive support have became an issue. We started migrating to curl_httpclient but quickly realized that support for request streaming is missing...
We're interested in making this work, so may be able to help with some guidance from you.
Here are some thoughts on how this could be approached:
- The
pycurl.READFUNCTIONcurrently reads from a static buffer - In case of a
body_producerwe might exhaust the data available to be sent before producer generates more, so we want to somehow suspend the request until more data arrives - We cannot
awaitforbody_generatorfromREADFUNCTIONof course, becauselibcurlis on the call stack libcurlluckily provides a special CURL_READFUNC_PAUSE return code that will pause the handler until it's un-paused explicitly- My suggestion is to spawn an additional async task that:
- fetches data from
body_generatoruntil its exhausted one chunk at a time - puts new chunk into
tornado.queues.Queue(maxsize=1) - un-pauses the
Curlhandler if it's currently paused
- fetches data from
- On the
READFUNCTIONside:- function will read available data from the intermediate
buffer - once buffer is empty it will do
queue.get_nowait()to see if more is available - if yes - return N bytes to curl and store the rest int the
buffer - if no - pause the handler by returning
pycurl.READFUNC_PAUSE
- function will read available data from the intermediate
- Using a queue in addition to the
bufferensures that we won't OOM ifbody_generatorproduces data much faster than curl is able to send. Similar kind of backpressure could be useful in current streaming WRITEFUNCTION as well actually.
Let me know what you think!
Hi @sergiimk ,
Thanks for the interest in this feature and the detailed writeup. This approach sounds good to me.
One alternative you may want to consider is using a tornado.iostream._StreamBuffer combined with a tornado.locks.BoundedSemaphore (or maybe a Condition?) instead of a buffer (memoryview?) and a Queue. The Semaphore would let you limit the memory used in bytes, instead of in number of chunks. And the _StreamBuffer class is carefully optimized to minimize copies while also collecting small writes into larger batches (How much this matters depends on how the body_producer works - if it produces, say, 4KB at a time consistently, then the queue is fine. If it sometimes produces small chunks and sometimes larger ones, the _StreamBuffer would be more beneficial).
I gave this a shot today, but it didn't quite work out.
Half way through the implementation I realized that the streamed requests take a lot longer to complete than expected. I traced the problem to curl.pause(pycurl.PAUSE_CONT) call not actually "waking up" the connection in libcurl - the library doesn't schedule a timer callback or any socket actions after un-pause.
The only reason request was succeeding at all is thanks to the existing force_timeout hack that calls multi.socket_all() every second forcing libcurl to re-check the state of all connections.
This bug is called out in the docs (see "USAGE WITH THE MULTI-SOCKET INTERFACE") as fixed, but I reproduced it even on 7.72.0. The libcurl's changelog has lots of attempts to fix pause/unpause so looks like it's a long-dragging issue...
I tried several workarounds (e.g. scheduling the timeout for libcurl myself) but they all behaved differently under my host's libcurl version and the version we use in our deployments - so it seemed extremely brittle. The only reliable solution was to call multi.socket_all() immediately after un-pausing the connection, but docs say it's a performance killer - so it's a no-go for our highly-concurrent scenario when streaming lots of small chunks.
Leaving this info for posterity. A repro case and an issue with curl would be the proper next step, but given all the problems we already have with distributing pycurl I think I'll take a step back for now and re-assess the options.