asynctools icon indicating copy to clipboard operation
asynctools copied to clipboard

Issues with using asyncPipe and faststream

Open yyoncho opened this issue 2 years ago • 0 comments

Consider the following examples:

Variant: 1 writing into a asyncPipe

import
  faststreams/async_backend,
  faststreams/asynctools_adapters,
  faststreams/textio,
  faststreams/inputs

proc copyStdioToPipe(pipe: AsyncPipe) {.thread.} =
  var ch = "hello"
  var ch2 = "\n"

  while ch[0] != '\0':
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch[0].addr, 1)
    discard waitFor write(pipe, ch2[0].addr, 1)

proc myReadLine(input: AsyncInputStream): Future[void] {.async.} =
  while input.readable:
    discard input.readLine()

when isMainModule:
  var
    pipe = createPipe(register = true)
    stdioThread: Thread[AsyncPipe]

  createThread(stdioThread, copyStdioToPipe, pipe)
  let a = asyncPipeInput(pipe)
  waitFor(myReadLine(a))

fails with

/home/yyoncho/Sources/nim/langserver/nls.nim(12) copyStdioToPipe /home/yyoncho/Sources/nim/asynctools/asynctools/asyncpipe.nim(424) write /home/yyoncho/.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1220) addWrite Error: unhandled exception: File descriptor not registered. [ValueError]

Variant: 2 writing into a pipe using posix

import
  posix,
  os,
  faststreams/async_backend,
  faststreams/asynctools_adapters,
  faststreams/textio,
  faststreams/inputs

proc writeToPipe(p: AsyncPipe, data: pointer, nbytes: int) =
  if posix.write(p.getWriteHandle, data, cint(nbytes)) < 0:
    raiseOsError(osLastError())

proc copyStdioToPipe(pipe: AsyncPipe) {.thread.} =
  var ch = "X222"
  var ch2 = "\n"

  while ch[0] != '\0':
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch[0].addr, 1)
    writeToPipe(pipe, ch2[0].addr, 1)

proc myReadLine(input: AsyncInputStream): Future[void] {.async.} =
  while input.readable:
    echo await input.readLine()

when isMainModule:
  var
    pipe = createPipe(register = true)
    stdioThread: Thread[AsyncPipe]

  createThread(stdioThread, copyStdioToPipe, pipe)
  let a = asyncPipeInput(pipe)
  waitFor(myReadLine(a))

Fails with:

/home/yyoncho/Sources/nim/langserver/nls.nim(19) copyStdioToPipe /home/yyoncho/Sources/nim/langserver/nls.nim(11) writeToPipe /home/yyoncho/.choosenim/toolchains/nim-#devel/lib/pure/includes/oserr.nim(94) raiseOSError Error: unhandled exception: Resource temporarily unavailable [OSError]

Investigating the failure in variant 2 I found https://stackoverflow.com/questions/14370489/what-can-cause-a-resource-temporarily-unavailable-on-sock-send-command and by making the input file descriptor as non blocking I was able to solve the issue:

-    proc createPipe*(size = 65536, register = true): AsyncPipe =
+    proc createPipe*(size = 65536, register = true, nonBlockingWrite = true): AsyncPipe =
       var fds: array[2, cint]
       if posix.pipe(fds) == -1:
         raiseOSError(osLastError())
       setNonBlocking(fds[0])
-      setNonBlocking(fds[1])
+      if nonBlockingWrite:
+        setNonBlocking(fds[1])

Let me know if this fix makes sense to you (it is a bit controversial since it will allow creating asyncPipe with sync FD). Alternatively, making readPipe/writePipe public will do too but it still looks disputable.

yyoncho avatar Feb 22 '22 09:02 yyoncho