httpaf icon indicating copy to clipboard operation
httpaf copied to clipboard

Lack of backpressure when client code does not schedule read (httpaf-lwt-unix)

Open Lupus opened this issue 6 years ago • 4 comments

/cc @aantron

I've modified lwt_echo_post example as follows:

open Base
open Lwt.Infix
module Arg = Caml.Arg

open Httpaf_lwt_unix
module Reqd = Httpaf.Reqd
module Request = Httpaf.Request
module Headers = Httpaf.Headers
module Response = Httpaf.Response
module Body = Httpaf.Body

let slow_echo_post reqd =
  match Reqd.request reqd  with
  | { Request.meth = `POST; headers; _ } ->
    let response =
      let content_type =
        match Headers.get headers "content-type" with
        | None   -> "application/octet-stream"
        | Some x -> x
      in
      Response.create ~headers:(Headers.of_list ["content-type", content_type; "connection", "close"]) `OK
    in
    let request_body  = Reqd.request_body reqd in
    let response_body = Reqd.respond_with_streaming reqd response in
    let rec on_read buffer ~off ~len =
      Lwt.async @@ fun () -> Lwt.Infix.(
        Lwt_unix.sleep(1.0) >>= fun () ->
        Body.schedule_bigstring response_body buffer ~off ~len;
        Body.flush response_body (fun () ->
          Body.schedule_read request_body ~on_eof ~on_read);
        Lwt.return ()
      );
    and on_eof () =
      Body.close_writer response_body
    in
    Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read
  | _ ->
    let headers = Headers.of_list [ "connection", "close" ] in
    Reqd.respond_with_string reqd (Response.create ~headers `Method_not_allowed) ""
;;

let request_handler (_ : Unix.sockaddr) = slow_echo_post (* Httpaf_examples.Server.echo_post *)
let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler

let main port =
  let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
  Lwt.async (fun () ->
    Lwt_io.establish_server_with_client_socket
      listen_address
      (Server.create_connection_handler ~request_handler ~error_handler)
    >|= fun _server ->
      Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
      Stdio.printf "To send a POST request, try one of the following\n\n";
      Stdio.printf "  echo \"Testing echo POST\" | dune exec examples/async/async_post.exe\n";
      Stdio.printf "  echo \"Testing echo POST\" | dune exec examples/lwt/lwt_post.exe\n";
      Stdio.printf "  echo \"Testing echo POST\" | curl -XPOST --data @- http://localhost:%d\n\n%!" port);
  let forever, _ = Lwt.wait () in
  Lwt_main.run forever
;;

let () =
  let port = ref 8080 in
  Arg.parse
    ["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
    ignore
    "Echoes POST requests. Runs forever.";
  main !port
;;

Upload large file to this app:

curl -H"Expect:" -XPOST -d @very_big_file -o/dev/null  http://0.0.0.0:8080/

Expected behavior would be to stop consuming data from the socket as there's nowhere to feed it, but that's not the case. I've instrumented read function from httpaf_lwt_unix.ml with a print like this:

image

During upload print statement executes constantly, memory footprint grows until full request is buffered in memory. Response is being sent back slowly due to delay as expected.

Lupus avatar Jul 31 '19 19:07 Lupus

Looks like this issue is not Lwt specific though. Buffer gets pushed to Httpaf.Server_connection.read, which (probably) invokes on_read callback, passed to Httpaf.Body.schedule_read, but there is no way for on_read to signal back that buffer is not yet consumed. Probably the interface could be extended with some Httpaf.Body.schedule_async_read which would also take continuation to call when buffer is really consumed (one more closure allocated per io that's likely to get promoted from minor heap, sigh).

Lupus avatar Jul 31 '19 20:07 Lupus

Part of the contract of schedule_read, which I realize now isn't documented, is that when on_read is called, the application need to consume all bytes provided before returning.

I think the way to handle back pressure (which never happens right now, kind by design), is to have the reader be active when there's a read scheduled. If there isn't one scheduled, then the next_read_operation will return Yield until schedule_read is called.

Or something along those lines.

seliopou avatar Aug 01 '19 00:08 seliopou

After some time of looking at the code I'm still unable to find the relevant places for such a change :(

So far I've worked around this by using Lwt_mutex which gets blocked in the request handler while it's waiting for data to be processed. Same mutex is being blocked by Lwt adapter layer when it tries to push the next buffer to httpaf. Albeit being slow, it works as a flow control.

Lupus avatar Aug 02 '19 14:08 Lupus

So as I understood right now reader never yields when request is not finished https://github.com/inhabitedtype/httpaf/blob/master/lib/server_connection.ml#L224 Maybe we need to add some limit for parser buffer and yield while nobody consumes this data.

sazarkin avatar Oct 16 '19 11:10 sazarkin