tornado icon indicating copy to clipboard operation
tornado copied to clipboard

Feature request: Implement `body_producer` for `curl_httpclient`

Open legnaleurc opened this issue 7 years ago • 6 comments

This line seems implies the body is always a text, which is not always true: https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L424

Can we unify the API, e.g. wrap body_producer to pycurl.READFUNCTION?

legnaleurc avatar Jun 25 '17 19:06 legnaleurc

The utf8 function does not require that its input be text. It accepts either text strings or byte strings, and if it is a text string, uses utf8 to convert it to bytes. So you can simply pass bytes directly here (and most places in tornado).

bdarnell avatar Jun 30 '17 19:06 bdarnell

I should clarify my intention: I need to "stream" a large body (by PUT method) with CurlAsyncHTTPClient, since body_producer somehow does not support in CurlAsyncHTTPClient, I have to use prepare_curl_callback to register a read function.

In this case I have to set allow_nonstandard_methods, and it's not enough, because pycurl.READFUNCTION will be overwrite anyway: https://github.com/tornadoweb/tornado/blob/v4.5.1/tornado/curl_httpclient.py#L429

So I'm not able to use CurlAsyncHTTPClient to stream body with PUT.

legnaleurc avatar Jun 30 '17 21:06 legnaleurc

I see. I've retitled the issue to reflect that.

bdarnell avatar Jul 05 '17 02:07 bdarnell

Hi @bdarnell, curious if you have any plans about addressing this limitation.

We have been long time users of the simple_httpclient, but its performance and lack of Keep-Alive support have became an issue. We started migrating to curl_httpclient but quickly realized that support for request streaming is missing...

We're interested in making this work, so may be able to help with some guidance from you.

Here are some thoughts on how this could be approached:

  • The pycurl.READFUNCTION currently reads from a static buffer
  • In case of a body_producer we might exhaust the data available to be sent before producer generates more, so we want to somehow suspend the request until more data arrives
  • We cannot await for body_generator from READFUNCTION of course, because libcurl is on the call stack
  • libcurl luckily provides a special CURL_READFUNC_PAUSE return code that will pause the handler until it's un-paused explicitly
  • My suggestion is to spawn an additional async task that:
    • fetches data from body_generator until its exhausted one chunk at a time
    • puts new chunk into tornado.queues.Queue(maxsize=1)
    • un-pauses the Curl handler if it's currently paused
  • On the READFUNCTION side:
    • function will read available data from the intermediate buffer
    • once buffer is empty it will do queue.get_nowait() to see if more is available
    • if yes - return N bytes to curl and store the rest int the buffer
    • if no - pause the handler by returning pycurl.READFUNC_PAUSE
  • Using a queue in addition to the buffer ensures that we won't OOM if body_generator produces data much faster than curl is able to send. Similar kind of backpressure could be useful in current streaming WRITEFUNCTION as well actually.

Let me know what you think!

sergiimk avatar Nov 05 '20 23:11 sergiimk

Hi @sergiimk ,

Thanks for the interest in this feature and the detailed writeup. This approach sounds good to me.

One alternative you may want to consider is using a tornado.iostream._StreamBuffer combined with a tornado.locks.BoundedSemaphore (or maybe a Condition?) instead of a buffer (memoryview?) and a Queue. The Semaphore would let you limit the memory used in bytes, instead of in number of chunks. And the _StreamBuffer class is carefully optimized to minimize copies while also collecting small writes into larger batches (How much this matters depends on how the body_producer works - if it produces, say, 4KB at a time consistently, then the queue is fine. If it sometimes produces small chunks and sometimes larger ones, the _StreamBuffer would be more beneficial).

bdarnell avatar Nov 06 '20 01:11 bdarnell

I gave this a shot today, but it didn't quite work out.

Half way through the implementation I realized that the streamed requests take a lot longer to complete than expected. I traced the problem to curl.pause(pycurl.PAUSE_CONT) call not actually "waking up" the connection in libcurl - the library doesn't schedule a timer callback or any socket actions after un-pause.

The only reason request was succeeding at all is thanks to the existing force_timeout hack that calls multi.socket_all() every second forcing libcurl to re-check the state of all connections.

This bug is called out in the docs (see "USAGE WITH THE MULTI-SOCKET INTERFACE") as fixed, but I reproduced it even on 7.72.0. The libcurl's changelog has lots of attempts to fix pause/unpause so looks like it's a long-dragging issue...

I tried several workarounds (e.g. scheduling the timeout for libcurl myself) but they all behaved differently under my host's libcurl version and the version we use in our deployments - so it seemed extremely brittle. The only reliable solution was to call multi.socket_all() immediately after un-pausing the connection, but docs say it's a performance killer - so it's a no-go for our highly-concurrent scenario when streaming lots of small chunks.

Leaving this info for posterity. A repro case and an issue with curl would be the proper next step, but given all the problems we already have with distributing pycurl I think I'll take a step back for now and re-assess the options.

sergiimk avatar Nov 10 '20 01:11 sergiimk