Lack of backpressure when client code does not schedule read (httpaf-lwt-unix)
/cc @aantron
I've modified lwt_echo_post example as follows:
open Base
open Lwt.Infix
module Arg = Caml.Arg
open Httpaf_lwt_unix
module Reqd = Httpaf.Reqd
module Request = Httpaf.Request
module Headers = Httpaf.Headers
module Response = Httpaf.Response
module Body = Httpaf.Body
let slow_echo_post reqd =
match Reqd.request reqd with
| { Request.meth = `POST; headers; _ } ->
let response =
let content_type =
match Headers.get headers "content-type" with
| None -> "application/octet-stream"
| Some x -> x
in
Response.create ~headers:(Headers.of_list ["content-type", content_type; "connection", "close"]) `OK
in
let request_body = Reqd.request_body reqd in
let response_body = Reqd.respond_with_streaming reqd response in
let rec on_read buffer ~off ~len =
Lwt.async @@ fun () -> Lwt.Infix.(
Lwt_unix.sleep(1.0) >>= fun () ->
Body.schedule_bigstring response_body buffer ~off ~len;
Body.flush response_body (fun () ->
Body.schedule_read request_body ~on_eof ~on_read);
Lwt.return ()
);
and on_eof () =
Body.close_writer response_body
in
Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read
| _ ->
let headers = Headers.of_list [ "connection", "close" ] in
Reqd.respond_with_string reqd (Response.create ~headers `Method_not_allowed) ""
;;
let request_handler (_ : Unix.sockaddr) = slow_echo_post (* Httpaf_examples.Server.echo_post *)
let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler
let main port =
let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~request_handler ~error_handler)
>|= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Stdio.printf " echo \"Testing echo POST\" | dune exec examples/async/async_post.exe\n";
Stdio.printf " echo \"Testing echo POST\" | dune exec examples/lwt/lwt_post.exe\n";
Stdio.printf " echo \"Testing echo POST\" | curl -XPOST --data @- http://localhost:%d\n\n%!" port);
let forever, _ = Lwt.wait () in
Lwt_main.run forever
;;
let () =
let port = ref 8080 in
Arg.parse
["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
ignore
"Echoes POST requests. Runs forever.";
main !port
;;
Upload large file to this app:
curl -H"Expect:" -XPOST -d @very_big_file -o/dev/null http://0.0.0.0:8080/
Expected behavior would be to stop consuming data from the socket as there's nowhere to feed it, but that's not the case. I've instrumented read function from httpaf_lwt_unix.ml with a print like this:

During upload print statement executes constantly, memory footprint grows until full request is buffered in memory. Response is being sent back slowly due to delay as expected.
Looks like this issue is not Lwt specific though. Buffer gets pushed to Httpaf.Server_connection.read, which (probably) invokes on_read callback, passed to Httpaf.Body.schedule_read, but there is no way for on_read to signal back that buffer is not yet consumed. Probably the interface could be extended with some Httpaf.Body.schedule_async_read which would also take continuation to call when buffer is really consumed (one more closure allocated per io that's likely to get promoted from minor heap, sigh).
Part of the contract of schedule_read, which I realize now isn't documented, is that when on_read is called, the application need to consume all bytes provided before returning.
I think the way to handle back pressure (which never happens right now, kind by design), is to have the reader be active when there's a read scheduled. If there isn't one scheduled, then the next_read_operation will return Yield until schedule_read is called.
Or something along those lines.
After some time of looking at the code I'm still unable to find the relevant places for such a change :(
So far I've worked around this by using Lwt_mutex which gets blocked in the request handler while it's waiting for data to be processed. Same mutex is being blocked by Lwt adapter layer when it tries to push the next buffer to httpaf. Albeit being slow, it works as a flow control.
So as I understood right now reader never yields when request is not finished https://github.com/inhabitedtype/httpaf/blob/master/lib/server_connection.ml#L224 Maybe we need to add some limit for parser buffer and yield while nobody consumes this data.