lwt
lwt copied to clipboard
How to combine Lwt filters?
I am interested into using asynchronous processes to replace some shell routines by OCaml routines. I therefore need to combine Lwt filters, but doing so results in a filter not terminating upon reading EOF on stdin.
Let us take a look at a simplified first example, where a filter is created by combining two threads running cat:
let filter_cat =
Lwt_process.pmap_lines ("cat", [| "cat" |])
let filter_cat =
Lwt_process.pmap_lines ("cat", [| "cat" |])
let filter_t () =
Lwt_io.stdin
|> Lwt_io.read_lines
|> filter_cat
|> filter_cat
|> Lwt_io.write_lines Lwt_io.stdout
let () =
Lwt_main.run (filter_t ())
This filter somehow works but hangs up when its standard input closes instead
of exiting. If I remove one of the filter_cat filters, it works as expected.
I am guessing that I do not compose these filters appropriately and therefore cannot join the two threads I am starting. What is the correct way to compose these filters, so that the program terminates after it reads EOF on stdin?
I reported that problem on the mailing list and on SO without much luck.
System information
% uname -a
Darwin oscar.local 14.1.0 Darwin Kernel Version 14.1.0: Thu Feb 26 19:26:47
PST 2015; root:xnu-2782.10.73~1/RELEASE_X86_64 x86_64
% ocaml -version
The OCaml toplevel, version 4.02.1
% opam show lwt
package: lwt
version: 2.4.8
repository: default
upstream-url: https://github.com/ocsigen/lwt/archive/2.4.8.tar.gz
upstream-kind: http
upstream-checksum: 2f11601bd9535b2e550026f41d9cc883
depends: ocamlfind >= 1.5.0 & (base-no-ppx | ppx_tools)
depopts: base-threads | base-unix | conf-libev | camlp4 | ssl |
react | lablgtk
installed-version: 2.4.8 [system]
available-versions: 2.3.2, 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6,
2.4.7, 2.4.8
description: A cooperative threads library for OCaml
This library is part of the Ocsigen project. See:
Just a gentle ping on that issue I reported last month.
I could reproduce this behaviour on Ubuntu 10.4, so that I would rule out operating-system specificities to explain it.
I can reproduce this behavior on NetBSD/amd64 6.1.4. The results are the same using either the byte-code or native compilers.
@rneswold Great! I only tested this in the byte-code case. I started the code of the function pmap_lines. It looks quite self-contained, so I was planning to add the implementation to the reproduction case and decorate it with tracing operations, or step through it with a debugger, but the latter might be a bit tricky.
But maybe before I start with this, you have a better idea?
- Redirecting is tricky. You need to make sure, that all (intermediate) pipes are closed as soon as possible. Otherwise you get such freezings.
Lwt_process.pmap_linesis probably too high level for advanced use cases, because such details are not exposed. Use lower level functions and make sure manually, that all channels are closed instantly. - Your solution is slow anyway. The operating system is faster than your manual copying. Whenever possible, connect the pipes directly, avoid calls to 'Lwt_io.read_lines' and 'Lwt_io.write_lines' (they are also used internally by
Lwt_process.pmap_lines); e.g:
open Lwt.Infix
let () =
let p1 = "tac", [| "tac" |] in
let p1_stdin, p1_stdout = Unix.pipe () in
let p2 = "tr", [| "tr" ; "[:lower:]" ; "[:upper:]" |] in
let p2_stdin, p2_stdout = Unix.pipe () in
let proc1 =
Lwt_process.exec
~stdin:(`FD_copy Unix.stdin)
~stdout:(`FD_move p1_stdout) p1
in
let proc2 =
Lwt_process.exec
~stdin:(`FD_move p1_stdin)
~stdout:(`FD_move p2_stdout) p2
in
let p2_stdin_chan = Lwt_io.of_unix_fd ~mode:Lwt_io.Input p2_stdin in
let lines = Lwt_io.read_lines p2_stdin_chan in
Lwt_main.run (do_something lines)
In real case usage, you have to check proc1, proc2, etc.
@fdopen Thank you for your input! As combining filters is a very important use-case, maybe it would be interesting to have a function supporting it? I definitely write one, maybe it could be fit in Lwt_process?
In real case usage, you have to check proc1, proc2, etc.
I am not sure what you mean: do you refer to controlling the output value of Let_process.exec or rather to monitor the processes once they have been launched?
Last, I will rea? relevant in the system-programming course for OCaml-programmers by Leroy and Rémy to write the pipe combining function, but maybe you have a complementary resource to suggest!
BTW If you are interested, you can paste your answer in SO – in the beginning I was not sure this could have been seen as a bug. Otherwise I will do it in a couple of days, so that interested persons can find this ticket and the corresponding followup.
I think I nailed this. While working on Rashell. I has to reimplement some internal functions of Lwt_process esp. the monitor function. In the original code the reference to the internal (not the argument) sender is lost when the getter terminates. In my variant, the getter returns the sender/monitor thread when it is done allowing it to be terminated.
I did not examine the issue thoroughly, but at a first glance, it seems like (as mentioned above), a reference to the internal sender defined in the function is lost on some execution paths, preventing the thread to be properly terminated and removed from the scheduler.
Noting: http://stackoverflow.com/a/34987825/2482998. Haven't yet looked deeply at it. Thanks for writing that up.