conduit
conduit copied to clipboard
Confusion with `sourceProcessWithStreams` and `takeExactly`
I wrote some wrong code:
-- Capture at most 1 line of stdout/stderr.
-- When that has happened, `sourceProcessWithStreams` closes the
-- corresponding pipe, so upon some more output the program will
-- fail with SIGPIPE, so we ignore the exit code.
(_exitCode, _outLinesList, errLinesList) <-
sourceProcessWithStreams
cp -- my program
(return ()) -- stdin
(CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stdout
(CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stderr
This code does not do what the comment says it should do:
- When the started process prints one or more lines to
stderr
and then anything to stdout, it works as expected. - But when the started process prints nothing to
stderr
and a lot tostdout
, thensourceProcessWithStreams
hangs forever.
I think this is because the process blocks on write(stdout, ...)
over the pipe, while our sourceProcessWithStreams
is implemented as
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> forall a. IO a -> Concurrently a
Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
and Applicative
(,,,) <$> ... <*> ... <*> ...
over Concurrently
waits for all 3 parts to terminate.
So what happens is:
- The program prints >= 1 line to
stdout
. The Haskell parent consumes 1 line andConcurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
terminates.stderr
stays open. - Now the Haskell parent waits for
Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
to terminate, but that never happens because the program gets blocked writing more tostdout
(which is never consumed by the parent).
This seems wrong
Question 1: Different sourceProcessWithStreams
implementation
For each handle (not only stdin
), shouldn't it be closed when the corresponding conduit terminates, like so:
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<*> Concurrently ((unliftIO u $ runConduit $ sourceStdout .| consumerStdout) `finally` closeStdout)
<*> Concurrently ((unliftIO u $ runConduit $ sourceStderr .| consumerStderr) `finally` closeStderr)
)
I would argue this is would be correct behaviour because since the handles are created and contained within sourceProcessWithStreams
and not made accessible anywhere, nothing else but consumerStdout
/consumerStderr
can read from them, so blocking is the only thing that can happen when those sinks terminate.
In either case, I think there should be a clear statement in the docs on what happens if conduits terminate without consuming all they are given (handle closing implying SIGPIPE
on more, or infinite hanging).
Question 2: takeExactly
docs
With the original sourceProcessWithStreams
, I assumed that instead of implementing my desired terminate-after-1-line logic, I could also make it churn through all remaining stdout
using sinkNull
after consuming the first line:
- (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList)
+ (CT.decode CT.utf8 .| CT.lines .| ((CC.take 1 .| CC.sinkList) <* CC.sinkNull))
This works as expected.
Then I though that I could also write it using takeExactly
(CT.decode CT.utf8 .| CT.lines .| CC.takeExactly 1 CC.sinkList)
I thought CC.takeExactly 1 CC.sinkList
should be equivalent to ((CC.take 1 .| CC.sinkList) <* CC.sinkNull)
.
However, it is not; it blocks forever.
The docs say:
This function is in contrast to
take
, which will only consume up to the given number of values, and will terminate early if downstream terminates early. This function will discard any additional values in the stream if they are unconsumed.
After reading the implementation, I figured that
This function will discard any additional values in the stream if they are unconsumed.
really means
This function will discard any additional values in the stream if they are unconsumed by downstream.
and not (what I assumed)
This function will discard any additional values in the stream if they are unconsumed by this function.
That is, the implementation does:
take count .| do
r <- inner
CL.sinkNull
return r
which is equivalent to
take count .| (inner <* CL.sinkNull)
and I thought it does
(take count .| inner) <* CL.sinkNull
I think it would be very helpful make this distinction clear, e.g. at least adding the suggested by downstream
to the sentence; ideally even contrasting the two behavious above directly and saying which of them the function implements.
But I want to get your opinion first.
Question 3: createSource
docs
Somewhat unrelatedly, I found that Data.Conduit.Process.Typed.createSource says
Read output from a process by read from a conduit.
and I didn't get what by read from a conduit
means, maybe there's a typo or a word missing?
I see no problem with adding the close streams, seems reasonable. Want to confirm if fixes the problem and open a PR?
For takeExactly
: you have it right, better docs PR welcome too.
No idea what by read from a conduit
is supposed to mean TBH. Read output from a process using a Conduit
would seem correct enough.