ocaml-cohttp icon indicating copy to clipboard operation
ocaml-cohttp copied to clipboard

Connection closing on client side with Cohttp_async

Open prepor opened this issue 10 years ago • 6 comments

Hello

How to properly handle connection closing with Cohttp async? My code:

open Core.Std
open Async.Std

let () =
  let callback ~body _a req =
    let (r, w) = Pipe.create () in
    let rec ticks () =
      Pipe.write w "tick!\n" >>= fun () ->
      after (Time.Span.of_ms 100.0) >>= fun () ->
      ticks () in
    ticks () |> don't_wait_for;
    Cohttp_async.Server.respond_with_pipe r in
  let where_to_listen = Tcp.on_port 3000 in
  Cohttp_async.Server.create where_to_listen callback |> Deferred.ignore |> don't_wait_for;
  never_returns (Scheduler.go ())

After I close connection I see this error:

(((pid 25600) (thread_id 0))
 ((human_readable 2015-11-01T14:49:25+0300)
  (int63_ns_since_epoch 1446378565857680000))
 "unhandled exception in Async scheduler"
 ("unhandled exception"
  ((src/monitor.ml.Error_
    ((exn
      ("writer error"
       ((src/monitor.ml.Error_
         ((exn
           ("writer fd unexpectedly closed "
            ((id 0)
             (fd
              ((file_descr 6)
               (info
                (socket
                 ((listening_on
                   ((type_
                     ((family
                       ((family PF_INET) (address_of_sockaddr_exn <fun>)
                        (sexp_of_address <fun>)))
                      (socket_type SOCK_STREAM)))
                    (fd
                     ((file_descr 3)
                      (info
                       (replaced
                        (listening
                         (previously_was
                          (replaced
                           ((socket (bound_on (Inet (0.0.0.0 3000))))
                            (previously_was
                             (socket
                              ((family
                                ((family PF_INET)
                                 (address_of_sockaddr_exn <fun>)
                                 (sexp_of_address <fun>)))
                               (socket_type SOCK_STREAM))))))))))
                      (kind (Socket Passive)) (supports_nonblock true)
                      (have_set_nonblock true)
                      (state
                       (Close_requested
                        ((monitor
                          (((name main) (here ()) (id 1)
                            (has_seen_error true) (is_detached false)
                            (kill_index 0))))
                         (priority Normal) (local_storage ())
                         (backtrace_history ()) (kill_index 0))
                        <fun>))
                      (watching ((read Stop_requested) (write Not_watching)))
                      (watching_has_changed true) (num_active_syscalls 1)
                      (close_finished Empty)))))
                  (client (Inet (127.0.0.1 65442))))))
               (kind (Socket Active)) (supports_nonblock true)
               (have_set_nonblock true) (state Closed)
               (watching ((read Not_watching) (write Not_watching)))
               (watching_has_changed false) (num_active_syscalls 0)
               (close_finished (Full ()))))
             (monitor
              (((name (id 10)) (here ()) (id 10) (has_seen_error true)
                (is_detached true) (kill_index 0))
               ((name main) (here ()) (id 1) (has_seen_error true)
                (is_detached false) (kill_index 0))))
             (inner_monitor
              (((name (id 11)) (here ()) (id 11) (has_seen_error true)
                (is_detached true) (kill_index 0))
               ((name main) (here ()) (id 1) (has_seen_error true)
                (is_detached false) (kill_index 0))))
             (background_writer_state Running) (syscall Per_cycle)
             (bytes_received 236) (bytes_written 225) (scheduled <opaque>)
             (scheduled_bytes 11) (buf <opaque>) (scheduled_back 11)
             (back 11) (flushes <opaque>) (close_state Closed)
             (close_finished (Full ())) (close_started (Full ()))
             (producers_to_flush_at_close ())
             (flush_at_shutdown_elt (<opaque>))
             (check_buffer_age
              (((writer <opaque>)
                (queue
                 ((236 (2015-11-01 14:48:55.857421+03:00))
                  (236 (2015-11-01 14:48:56.860415+03:00))
                  (236 (2015-11-01 14:48:57.865701+03:00))
                  (236 (2015-11-01 14:48:58.866323+03:00))
                  (236 (2015-11-01 14:48:59.866576+03:00))
                  (236 (2015-11-01 14:49:00.870461+03:00))
                  (236 (2015-11-01 14:49:01.873933+03:00))
                  (236 (2015-11-01 14:49:02.875371+03:00))
                  (236 (2015-11-01 14:49:03.880655+03:00))
                  (236 (2015-11-01 14:49:04.885355+03:00))
                  (236 (2015-11-01 14:49:05.890174+03:00))
                  (236 (2015-11-01 14:49:06.892943+03:00))
                  (236 (2015-11-01 14:49:07.893824+03:00))
                  (236 (2015-11-01 14:49:08.894034+03:00))
                  (236 (2015-11-01 14:49:09.897577+03:00))
                  (236 (2015-11-01 14:49:10.899643+03:00))
                  (236 (2015-11-01 14:49:11.903092+03:00))
                  (236 (2015-11-01 14:49:12.908437+03:00))
                  (236 (2015-11-01 14:49:13.913755+03:00))
                  (236 (2015-11-01 14:49:14.918443+03:00))
                  (236 (2015-11-01 14:49:15.919621+03:00))
                  (236 (2015-11-01 14:49:16.923911+03:00))
                  (236 (2015-11-01 14:49:17.924368+03:00))
                  (236 (2015-11-01 14:49:18.925335+03:00))
                  (236 (2015-11-01 14:49:19.930008+03:00))
                  (236 (2015-11-01 14:49:20.933480+03:00))
                  (236 (2015-11-01 14:49:21.938598+03:00))
                  (236 (2015-11-01 14:49:22.938934+03:00))
                  (236 (2015-11-01 14:49:23.942140+03:00))
                  (236 (2015-11-01 14:49:24.946649+03:00))))
                (maximum_age 2m) (too_old Empty))))
             (consumer_left Empty) (raise_when_consumer_leaves true)
             (open_flags (Full (Ok (rdwr nonblock)))))))
          (backtrace
           ("Raised at file \"src/error.ml\", line 7, characters 21-29"
            "Called from file \"src/job_queue.ml\", line 164, characters 6-47"
            ""))
          (monitor
           (((name (id 11)) (here ()) (id 11) (has_seen_error true)
             (is_detached true) (kill_index 0))
            ((name main) (here ()) (id 1) (has_seen_error true)
             (is_detached false) (kill_index 0))))))
        ((id 0)
         (fd
          ((file_descr 6)
           (info
            (socket
             ((listening_on
               ((type_
                 ((family
                   ((family PF_INET) (address_of_sockaddr_exn <fun>)
                    (sexp_of_address <fun>)))
                  (socket_type SOCK_STREAM)))
                (fd
                 ((file_descr 3)
                  (info
                   (replaced
                    (listening
                     (previously_was
                      (replaced
                       ((socket (bound_on (Inet (0.0.0.0 3000))))
                        (previously_was
                         (socket
                          ((family
                            ((family PF_INET) (address_of_sockaddr_exn <fun>)
                             (sexp_of_address <fun>)))
                           (socket_type SOCK_STREAM))))))))))
                  (kind (Socket Passive)) (supports_nonblock true)
                  (have_set_nonblock true)
                  (state
                   (Close_requested
                    ((monitor
                      (((name main) (here ()) (id 1) (has_seen_error true)
                        (is_detached false) (kill_index 0))))
                     (priority Normal) (local_storage ())
                     (backtrace_history ()) (kill_index 0))
                    <fun>))
                  (watching ((read Stop_requested) (write Not_watching)))
                  (watching_has_changed true) (num_active_syscalls 1)
                  (close_finished Empty)))))
              (client (Inet (127.0.0.1 65442))))))
           (kind (Socket Active)) (supports_nonblock true)
           (have_set_nonblock true) (state Closed)
           (watching ((read Not_watching) (write Not_watching)))
           (watching_has_changed false) (num_active_syscalls 0)
           (close_finished (Full ()))))
         (monitor
          (((name (id 10)) (here ()) (id 10) (has_seen_error true)
            (is_detached true) (kill_index 0))
           ((name main) (here ()) (id 1) (has_seen_error true)
            (is_detached false) (kill_index 0))))
         (inner_monitor
          (((name (id 11)) (here ()) (id 11) (has_seen_error true)
            (is_detached true) (kill_index 0))
           ((name main) (here ()) (id 1) (has_seen_error true)
            (is_detached false) (kill_index 0))))
         (background_writer_state Running) (syscall Per_cycle)
         (bytes_received 236) (bytes_written 225) (scheduled <opaque>)
         (scheduled_bytes 11) (buf <opaque>) (scheduled_back 11) (back 11)
         (flushes <opaque>) (close_state Closed) (close_finished (Full ()))
         (close_started (Full ())) (producers_to_flush_at_close ())
         (flush_at_shutdown_elt (<opaque>))
         (check_buffer_age
          (((writer <opaque>)
            (queue
             ((236 (2015-11-01 14:48:55.857421+03:00))
              (236 (2015-11-01 14:48:56.860415+03:00))
              (236 (2015-11-01 14:48:57.865701+03:00))
              (236 (2015-11-01 14:48:58.866323+03:00))
              (236 (2015-11-01 14:48:59.866576+03:00))
              (236 (2015-11-01 14:49:00.870461+03:00))
              (236 (2015-11-01 14:49:01.873933+03:00))
              (236 (2015-11-01 14:49:02.875371+03:00))
              (236 (2015-11-01 14:49:03.880655+03:00))
              (236 (2015-11-01 14:49:04.885355+03:00))
              (236 (2015-11-01 14:49:05.890174+03:00))
              (236 (2015-11-01 14:49:06.892943+03:00))
              (236 (2015-11-01 14:49:07.893824+03:00))
              (236 (2015-11-01 14:49:08.894034+03:00))
              (236 (2015-11-01 14:49:09.897577+03:00))
              (236 (2015-11-01 14:49:10.899643+03:00))
              (236 (2015-11-01 14:49:11.903092+03:00))
              (236 (2015-11-01 14:49:12.908437+03:00))
              (236 (2015-11-01 14:49:13.913755+03:00))
              (236 (2015-11-01 14:49:14.918443+03:00))
              (236 (2015-11-01 14:49:15.919621+03:00))
              (236 (2015-11-01 14:49:16.923911+03:00))
              (236 (2015-11-01 14:49:17.924368+03:00))
              (236 (2015-11-01 14:49:18.925335+03:00))
              (236 (2015-11-01 14:49:19.930008+03:00))
              (236 (2015-11-01 14:49:20.933480+03:00))
              (236 (2015-11-01 14:49:21.938598+03:00))
              (236 (2015-11-01 14:49:22.938934+03:00))
              (236 (2015-11-01 14:49:23.942140+03:00))
              (236 (2015-11-01 14:49:24.946649+03:00))))
            (maximum_age 2m) (too_old Empty))))
         (consumer_left Empty) (raise_when_consumer_leaves true)
         (open_flags (Full (Ok (rdwr nonblock))))))))
     (monitor
      (((name (id 10)) (here ()) (id 10) (has_seen_error true)
        (is_detached true) (kill_index 0))
       ((name main) (here ()) (id 1) (has_seen_error true)
        (is_detached false) (kill_index 0))))))
   ((pid 25600) (thread_id 0)))))

OS X 10.11.1 Ocaml 4.02.3 cohttp 0.19.3 async 113.00.00

prepor avatar Nov 01 '15 12:11 prepor

The same error is in test_net_async_server from lib_test.

It's very easy to reproduce, just compile cohttp with ocaml setup.ml -configure --enable-tests, start ./test_net_async_server.native and start + stop curl http://127.0.0.1:8081/timer.

Unfortunately, I couldn't yet understand the root of this error but I think it's possible not only with respond_with_pipe but with any kind of response in cohttp_async but it's harder to reproduce :(

prepor avatar Dec 13 '15 13:12 prepor

Any news on that?

little-arhat avatar Jan 25 '16 11:01 little-arhat

Hmm.... the code keeps writing "tick!" into closed fd, so yes, the exception is finally raised.

You need to explicitly specify what to do with such exceptions (see on_handler_error) in create function. By default, all exceptions are re-raised.

See

(** Build a HTTP server, based on the [Tcp.Server] interface *)
  val create :
    ?max_connections:int ->
    ?max_pending_connections:int ->
    ?buffer_age_limit: Writer.buffer_age_limit ->
    ?on_handler_error:[ `Call of 'address -> exn  -> unit
                      | `Ignore
                      | `Raise ] ->
    ?mode:Conduit_async.server ->
    ('address, 'listening_on) Tcp.Where_to_listen.t
    -> (body:Body.t -> 'address -> Request.t -> response Deferred.t)
    -> ('address, 'listening_on) t Deferred.t

artemkin avatar Jan 25 '16 13:01 artemkin

@artemkin thanks a lot! But I think it should be considered as workaround and such exceptions as bugs. I'll check it later, what will happen with Pipe? Will it close?

prepor avatar Jan 25 '16 14:01 prepor

@prepor It seems you have to use Pipe.init instead of Pipe.create to get it closed automatically. See https://github.com/janestreet/async_kernel/blob/master/src/pipe.mli

it should be considered as workaround and such exceptions as bugs

How should it behave in your opinion? HTTP is request-response protocol, right, so I would expect a finite length of the response. If the client closed the connection unexpectedly there is not much we can do.

artemkin avatar Jan 25 '16 15:01 artemkin

This is still a problem, and seems to me to be clearly a bug. on_handler_error is the wrong place for this exception, as the handler is not the origin of this error. cohttp should not be raising exceptions and then pretending the handler did it.

No, responses do not have to be finite. See WebSocket and Server-Sent Events protocols. In many cases the client is expected to close those connections, and the server wants to serve them for as long as the client sticks around. Cohttp should be closing the pipe that represents the body if the connection is closed.

thedufer avatar Jun 13 '20 15:06 thedufer