conduit icon indicating copy to clipboard operation
conduit copied to clipboard

Confusion with `sourceProcessWithStreams` and `takeExactly`

Open nh2 opened this issue 2 years ago • 1 comments

I wrote some wrong code:

  -- Capture at most 1 line of stdout/stderr.
  -- When that has happened, `sourceProcessWithStreams` closes the
  -- corresponding pipe, so upon some more output the program will
  -- fail with SIGPIPE, so we ignore the exit code.
  (_exitCode, _outLinesList, errLinesList) <-
    sourceProcessWithStreams
      cp -- my program
      (return ()) -- stdin
      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stdout
      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stderr

This code does not do what the comment says it should do:

  • When the started process prints one or more lines to stderr and then anything to stdout, it works as expected.
  • But when the started process prints nothing to stderr and a lot to stdout, then sourceProcessWithStreams hangs forever.

I think this is because the process blocks on write(stdout, ...) over the pipe, while our sourceProcessWithStreams is implemented as

    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> forall a. IO a -> Concurrently a
Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
      `finally` (closeStdout >> closeStderr)

and Applicative (,,,) <$> ... <*> ... <*> ... over Concurrently waits for all 3 parts to terminate.

So what happens is:

  1. The program prints >= 1 line to stdout. The Haskell parent consumes 1 line and Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout) terminates. stderr stays open.
  2. Now the Haskell parent waits for Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr)) to terminate, but that never happens because the program gets blocked writing more to stdout (which is never consumed by the parent).

This seems wrong

Question 1: Different sourceProcessWithStreams implementation

For each handle (not only stdin), shouldn't it be closed when the corresponding conduit terminates, like so:

    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
        <*> Concurrently ((unliftIO u $ runConduit $ sourceStdout .| consumerStdout) `finally` closeStdout)
        <*> Concurrently ((unliftIO u $ runConduit $ sourceStderr .| consumerStderr) `finally` closeStderr)
      )

I would argue this is would be correct behaviour because since the handles are created and contained within sourceProcessWithStreams and not made accessible anywhere, nothing else but consumerStdout/consumerStderr can read from them, so blocking is the only thing that can happen when those sinks terminate.

In either case, I think there should be a clear statement in the docs on what happens if conduits terminate without consuming all they are given (handle closing implying SIGPIPE on more, or infinite hanging).

Question 2: takeExactly docs

With the original sourceProcessWithStreams, I assumed that instead of implementing my desired terminate-after-1-line logic, I could also make it churn through all remaining stdout using sinkNull after consuming the first line:

-      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList)
+     (CT.decode CT.utf8 .| CT.lines .| ((CC.take 1 .| CC.sinkList) <* CC.sinkNull))

This works as expected.

Then I though that I could also write it using takeExactly

      (CT.decode CT.utf8 .| CT.lines .| CC.takeExactly 1 CC.sinkList)

I thought CC.takeExactly 1 CC.sinkList should be equivalent to ((CC.take 1 .| CC.sinkList) <* CC.sinkNull).

However, it is not; it blocks forever.

The docs say:

This function is in contrast to take, which will only consume up to the given number of values, and will terminate early if downstream terminates early. This function will discard any additional values in the stream if they are unconsumed.

After reading the implementation, I figured that

This function will discard any additional values in the stream if they are unconsumed.

really means

This function will discard any additional values in the stream if they are unconsumed by downstream.

and not (what I assumed)

This function will discard any additional values in the stream if they are unconsumed by this function.

That is, the implementation does:

take count .| do
    r <- inner
    CL.sinkNull
    return r

which is equivalent to

take count .| (inner <* CL.sinkNull)

and I thought it does

(take count .| inner) <* CL.sinkNull

I think it would be very helpful make this distinction clear, e.g. at least adding the suggested by downstream to the sentence; ideally even contrasting the two behavious above directly and saying which of them the function implements.

But I want to get your opinion first.

Question 3: createSource docs

Somewhat unrelatedly, I found that Data.Conduit.Process.Typed.createSource says

Read output from a process by read from a conduit.

and I didn't get what by read from a conduit means, maybe there's a typo or a word missing?

nh2 avatar Jan 17 '23 05:01 nh2

I see no problem with adding the close streams, seems reasonable. Want to confirm if fixes the problem and open a PR?

For takeExactly: you have it right, better docs PR welcome too.

No idea what by read from a conduit is supposed to mean TBH. Read output from a process using a Conduit would seem correct enough.

snoyberg avatar Jan 19 '23 08:01 snoyberg