asyncssh icon indicating copy to clipboard operation
asyncssh copied to clipboard

Data lost with large size output

Open Luffbee opened this issue 1 year ago • 16 comments

Code to reproduce:

import asyncio as aio
import logging
import sys

import asyncssh as assh


async def main():
    async with assh.connect("localhost", known_hosts=None) as conn:
        await conn.run("cat ~/large.txt", check=True, stdout=sys.stdout)
    # await aio.sleep(1)


with aio.Runner() as runner:
    logging.basicConfig(
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
        level=logging.DEBUG,
    )
    runner.run(main())

The large.txt is large file with about 160K bytes.

$ ls -l large.txt 
-rw-rw-r--. 1 liuyifan liuyifan 169426 Jan 31 14:47 large.txt

With above code, I found the output is truncated unexpectedly. If the aio.sleep is uncommented, it will work fine. I dig into the code, add some log in asyncssh, and figure out what happened:

when the stdout is redirected to sys.stdout, the internal writer will be _PipeWriter in process.py. I add some log in its methods, and add the connection_lost method for it, which is used later.

class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):
    """Forward data to a pipe"""

    ...

    def connection_made(self, transport: asyncio.BaseTransport) -> None:
        """Handle a newly opened pipe"""

        self._process.logger.warn(f"conn made {transport}")
        self._transport = cast(asyncio.WriteTransport, transport)
    
    def connection_lost(self, exc: Exception | None) -> None:
        self._process.logger.warn(f"conn lost {exc}")
        return super().connection_lost(exc)

    ...

    def write_eof(self) -> None:
        """Write EOF to the pipe"""

        assert self._transport is not None
        self._process.logger.warn(f"pipe before eof {self._transport} {self._transport.get_write_buffer_size()} {self._transport.get_write_buffer_limits()}")
        self._transport.write_eof()
        self._process.logger.warn(f"pipe after eof {self._transport} {self._transport.get_write_buffer_size()} {self._transport.get_write_buffer_limits()}")

    def close(self) -> None:
        """Stop forwarding data to the pipe"""

        assert self._transport is not None

        # There's currently no public API to tell connect_write_pipe()
        # to not close the pipe on when the created transport is closed,
        # and not closing it triggers an "unclosed transport" warning.
        # This masks that warning when we want to keep the pipe open.
        #
        # pylint: disable=protected-access

        self._process.logger.warn("pipe try close")
        self._process.logger.warn(f"{self._transport} {self._transport.get_write_buffer_size()} {self._transport.get_write_buffer_limits()}")
        if self._needs_close:
            self._process.logger.warn("pipe close")
            self._transport.close()
        else:
            self._process.logger.warn("pipe ignore")
            self._transport._pipe = None # type: ignore

Here is the log with and without the aio.sleep (some some warning log is added by me for debugging):

Without aio.sleep (output is truncated)

$ tail -n 20 err.log 
2024-01-31 15:19:37,030 WARNING asyncssh: [conn=0, chan=0] Reading from channel paused
2024-01-31 15:19:37,030 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x20
2024-01-31 15:19:37,030 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x37
2024-01-31 15:19:37,031 INFO asyncssh: [conn=0, chan=0] Received exit status 0
2024-01-31 15:19:37,031 DEBUG asyncssh: [conn=0, chan=0] chan recv exit: 2 True open
2024-01-31 15:19:37,031 DEBUG asyncssh: [conn=0, chan=0] chan recv exit after: 2 True open
2024-01-31 15:19:37,031 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x5b
2024-01-31 15:19:37,031 DEBUG asyncssh: [conn=0, chan=0] Received 5586 data bytes pause y first 0x63
2024-01-31 15:19:37,032 INFO asyncssh: [conn=0, chan=0] Received channel close
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] resume
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] Reading from channel resumed
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] pipe before eof <_UnixWritePipeTransport fd=1 polling bufsize=51221> 51221 (16384, 65536)
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] pipe after eof <_UnixWritePipeTransport closing fd=1 polling bufsize=51221> 51221 (16384, 65536)
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] pipe try close
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] <_UnixWritePipeTransport closing fd=1 polling bufsize=51221> 51221 (16384, 65536)
2024-01-31 15:19:37,049 WARNING asyncssh: [conn=0, chan=0] pipe close
2024-01-31 15:19:37,049 INFO asyncssh: [conn=0, chan=0] Channel closed
2024-01-31 15:19:37,050 INFO asyncssh: [conn=0] Closing connection
2024-01-31 15:19:37,050 INFO asyncssh: [conn=0] Sending disconnect: Disconnected by application (11)
2024-01-31 15:19:37,050 INFO asyncssh: [conn=0] Connection closed

With aio.sleep, (output is complete)

$ tail -n 20 err-ok.log
2024-01-31 15:18:44,303 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x20
2024-01-31 15:18:44,303 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x37
2024-01-31 15:18:44,303 INFO asyncssh: [conn=0, chan=0] Received exit status 0
2024-01-31 15:18:44,303 DEBUG asyncssh: [conn=0, chan=0] chan recv exit: 3 True open
2024-01-31 15:18:44,303 DEBUG asyncssh: [conn=0, chan=0] chan recv exit after: 3 True open
2024-01-31 15:18:44,304 DEBUG asyncssh: [conn=0, chan=0] Received 16384 data bytes pause y first 0x5b
2024-01-31 15:18:44,304 DEBUG asyncssh: [conn=0, chan=0] Received 5586 data bytes pause y first 0x63
2024-01-31 15:18:44,304 INFO asyncssh: [conn=0, chan=0] Received channel close
2024-01-31 15:18:44,322 WARNING asyncssh: [conn=0, chan=0] resume
2024-01-31 15:18:44,322 WARNING asyncssh: [conn=0, chan=0] Reading from channel resumed
2024-01-31 15:18:44,323 WARNING asyncssh: [conn=0, chan=0] pipe before eof <_UnixWritePipeTransport fd=1 polling bufsize=55577> 55577 (16384, 65536)
2024-01-31 15:18:44,323 WARNING asyncssh: [conn=0, chan=0] pipe after eof <_UnixWritePipeTransport closing fd=1 polling bufsize=55577> 55577 (16384, 65536)
2024-01-31 15:18:44,323 WARNING asyncssh: [conn=0, chan=0] pipe try close
2024-01-31 15:18:44,323 WARNING asyncssh: [conn=0, chan=0] <_UnixWritePipeTransport closing fd=1 polling bufsize=55577> 55577 (16384, 65536)
2024-01-31 15:18:44,324 WARNING asyncssh: [conn=0, chan=0] pipe close
2024-01-31 15:18:44,324 INFO asyncssh: [conn=0, chan=0] Channel closed
2024-01-31 15:18:44,324 INFO asyncssh: [conn=0] Closing connection
2024-01-31 15:18:44,324 INFO asyncssh: [conn=0] Sending disconnect: Disconnected by application (11)
2024-01-31 15:18:44,325 INFO asyncssh: [conn=0] Connection closed
2024-01-31 15:18:44,570 WARNING asyncssh: [conn=0, chan=0] conn lost None

We can see that the pipe has pending data not flushed when it is closing, according the reference: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.BaseTransport.close, these data will be flushed asynchronously, and the connection_lost is the callback when the flush is done. However, the connection_lost method is not implemented in _PipeWriter or handled in other places. We can see that the log of connection_lost (which is added by me) is not appear without aio.sleep, and with the aio.sleep, it appears after the "connection closed" log (the last line). Therefore, I think this is a bug of asyncssh, it should take care of the _PipeWriter.connection_lost, and wait it when the connection is closing.

Luffbee avatar Jan 31 '24 07:01 Luffbee

Thanks for the report - I'll try and take a closer look at this over the weekend.

In other circumstances, the await conn.run() call already waits for all the output to stdout and stderr to be complete before returning and allowing the channel to close. However, perhaps there's a missing drain call to wait for all data to be forwarded when using the redirect feature that is allowing the channel to close prematurely. It also seems like something like this might be needed for stdin, to make sure a redirected stdin waits for the source to close and for all the data to be forwarded to the remote system before allowing the channel to close.

ronf avatar Feb 01 '24 14:02 ronf

I've started to look at this issue and I'm able to reliably reproduce it here, but I don't have a fix yet. Here's what I've got so far:

  • The problem occurs when reading from the channel gets paused, due to writes to sys.stdout filling the write pipe we open. While reading is paused, the channel delivers all of the file contents, an EOF message, and a channel close message.
  • Normally, the data coming in while paused would go into the channel's _recv_buf. However, when redirects are active, this doesn't happen. All the data is delivered straight to the writer (a PipeWriter in this case).
  • When the close message is received, a check is made to see if the _recv_buf is empty before transitioning from the 'close-pending' state to 'closed'. However, since the redirect doesn't buffer data in _recv_buf, this check doesn't take into account that there might be data buffered elsewhere that still needs to be flushed, and things are cleaned up before all this data is written to stdout successfully.

I haven't quite tracked down what buffering is happening once the stdout pipe fills up, or what might prevent that from being delivered after the connection is closed.

Note: With output of this size, I don't think there's any back-pressure to the server, since the default window size is 2 MiB. That's consistent with the rest of the file data and the EOF and close messages arriving from the server while reading is paused on stdout. Later, I see a log entry that reading is resumed, but it's after the channel is cleaned up.

ronf avatar Feb 04 '24 04:02 ronf

I'm not familiar with the internal buffers in asyncssh. For the buffer of pipewriter in this case, I think it is not implemented by asyncssh, and I didn't find a 'flush' method for it. The only way I found to 'flush' the pipe is close it and wait the connection_lost callback, which is described in the following link.

We can see that the pipe has pending data not flushed when it is closing, according the reference: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.BaseTransport.close, these data will be flushed asynchronously, and the connection_lost is the callback when the flush is done. However, the connection_lost method is not implemented in _PipeWriter or handled in other places. We can see that the log of connection_lost (which is added by me) is not appear without aio.sleep, and with the aio.sleep, it appears after the "connection closed" log (the last line). Therefore, I think this is a bug of asyncssh, it should take care of the _PipeWriter.connection_lost, and wait it when the connection is closing.

Luffbee avatar Feb 04 '24 06:02 Luffbee

An update:

I've confirmed that AssyncSSH does properly write all data received on the channel to the transport object created by the connect_write_pipe, and that it properly calls both write_eof() and close() on it, even in the case where reading from the channel was paused due to the output buffer for stdout becoming full. In that case, it waits for the resume_reading() callback to be called by the transport and then flushes the remainder of the data on the channel to the pipe before closing it. However, despite that, it seems like your test program isn't waiting around for the data to actually be written before exiting the event loop.

As you said, it might be possible to use the connection_lost() callback to know when all data is flushed, but I think that would only work in the case where AsyncSSH closes the pipe. While this is the default cause, there's an option you can pass in to tell AsyncSSH not to close the pipe so it can be used for multiple consecutive redirects. In this case, I'm not sure there will be a good way to wait for all written data to be flushed. When the caller uses that option, it might need to be up to them to close stdout themselves and then wait for any data to be flushed, or make sure on the final redirect that they don't pass in recv_eof=False as an argument to the redirect.

I'll continue to research options around flushing the output and let you know if I'm able to get something working.

ronf avatar Feb 08 '24 15:02 ronf

Ok - I've got a first cut at a possible fix, based on your suggestion. I still need to go back and better handle the case where recv_eof is set to False, but this should work in the default case where it is True:

diff --git a/asyncssh/process.py b/asyncssh/process.py
index 33a0cdb..a7840d9 100644
--- a/asyncssh/process.py
+++ b/asyncssh/process.py
@@ -407,12 +407,18 @@ class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):
         self._datatype = datatype
         self._needs_close = needs_close
         self._transport: Optional[asyncio.WriteTransport] = None
+        self._close_event = asyncio.Event()

     def connection_made(self, transport: asyncio.BaseTransport) -> None:
         """Handle a newly opened pipe"""

         self._transport = cast(asyncio.WriteTransport, transport)

+    def connection_lost(self, exc: Optional[Exception]) -> None:
+        """Handle closing of a pipe"""
+
+        self._close_event.set()
+
     def pause_writing(self) -> None:
         """Pause writing to the pipe"""

@@ -449,6 +455,7 @@ class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):

         if self._needs_close:
             self._transport.close()
+            self._process.add_cleanup_task(self._close_event.wait())
         else:
             self._transport._pipe = None # type: ignore

@@ -968,7 +975,7 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
         return bool(self._paused_write_streams) or \
             super()._should_pause_reading()

-    def add_cleanup_task(self, task: Awaitable[None]) -> None:
+    def add_cleanup_task(self, task: Awaitable) -> None:
         """Add a task to run when the process exits"""

         self._cleanup_tasks.append(task)

Let me know if this works for you.

ronf avatar Feb 09 '24 02:02 ronf

Here's a more complete change that should support recv_eof:

diff --git a/asyncssh/process.py b/asyncssh/process.py
index 33a0cdb..e205041 100644
--- a/asyncssh/process.py
+++ b/asyncssh/process.py
@@ -400,19 +400,24 @@ class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):
     """Forward data to a pipe"""

     def __init__(self, process: 'SSHProcess[AnyStr]', datatype: DataType,
-                 needs_close: bool, encoding: Optional[str], errors: str):
+                 encoding: Optional[str], errors: str):
         super().__init__(encoding, errors)

         self._process: 'SSHProcess[AnyStr]' = process
         self._datatype = datatype
-        self._needs_close = needs_close
         self._transport: Optional[asyncio.WriteTransport] = None
+        self._close_event = asyncio.Event()

     def connection_made(self, transport: asyncio.BaseTransport) -> None:
         """Handle a newly opened pipe"""

         self._transport = cast(asyncio.WriteTransport, transport)

+    def connection_lost(self, exc: Optional[Exception]) -> None:
+        """Handle closing of the pipe"""
+
+        self._close_event.set()
+
     def pause_writing(self) -> None:
         """Pause writing to the pipe"""

@@ -439,18 +444,8 @@ class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):
         """Stop forwarding data to the pipe"""

         assert self._transport is not None
-
-        # There's currently no public API to tell connect_write_pipe()
-        # to not close the pipe on when the created transport is closed,
-        # and not closing it triggers an "unclosed transport" warning.
-        # This masks that warning when we want to keep the pipe open.
-        #
-        # pylint: disable=protected-access
-
-        if self._needs_close:
-            self._transport.close()
-        else:
-            self._transport._pipe = None # type: ignore
+        self._transport.close()
+        self._process.add_cleanup_task(self._close_event.wait())


 class _ProcessReader(_ReaderProtocol, Generic[AnyStr]):
@@ -897,8 +892,7 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
         def pipe_factory() -> _PipeWriter:
             """Return a pipe write handler"""

-            return _PipeWriter(self, datatype, recv_eof,
-                               self._encoding, self._errors)
+            return _PipeWriter(self, datatype, self._encoding, self._errors)

         if target == PIPE:
             writer: Optional[_WriterProtocol[AnyStr]] = None
@@ -946,6 +940,10 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
                     # If file was opened in text mode, remove that wrapper
                     file = cast(TextIO, target).buffer

+                if not recv_eof:
+                    fd = os.dup(cast(IO[bytes], file).fileno())
+                    file = os.fdopen(fd, 'wb', buffering=0)
+
                 assert self._loop is not None
                 _, protocol = \
                     await self._loop.connect_write_pipe(pipe_factory, file)
@@ -968,7 +966,7 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
         return bool(self._paused_write_streams) or \
             super()._should_pause_reading()

-    def add_cleanup_task(self, task: Awaitable[None]) -> None:
+    def add_cleanup_task(self, task: Awaitable) -> None:
         """Add a task to run when the process exits"""

         self._cleanup_tasks.append(task)

ronf avatar Feb 09 '24 05:02 ronf

Thanks for your repair. I will try it in the next few days. However, I think the fix that supports recv_eof will cause some "output synchronization" problems: writes on the original file and the file from os.dup() may be reordered. For example, the following program may (not sure for now) output "line2 line1 line3" instead of "line1 line2 line3". Although I haven't tried this program yet, I did meet this problem with the stdout=os.dup(sys.stdout.fileno()) when I was debugging the data loss problem.

async def main():
    print("line1")
    async with assh.connect("localhost", known_hosts=None) as conn:
        await conn.run("echo line2", check=True, stdout=sys.stdout, recv_eof=False)
    print("line3")

One question here is "Do we really need to use the connect_write_pipe()?". According to my understanding, the aim of using connect_write_pipe() is to make the file support non-blocking write/read. When using stdout=sys.stdout or regular files, however, I think blocking write/read will be fine (It seems regular files indeed use blocking write/read?). What's more, I see there is a check about whether the file supports async write/read, so maybe we can remove the connect_write_pipe() and use blocking write/read here. If non-blocking write/read is needed, it should be done by the user to provide a file that support async write/read.

Luffbee avatar Feb 10 '24 18:02 Luffbee

The only way for things to get out of order here should be if some form of buffering is used. This would be the case for a plain print() call in some cases (though you may not always notice if you are going to a TTY and always outputting full lines). However, since AsyncSSH already discards the buffer when you pass it a file object which is buffered, this potential for out-of-order output already exists in AsyncSSH before this change. The solution is to manually call flush() on whatever files you do buffered writes to before you trigger the conn.run() with the redirect. As long as you do that, there should be no ordering issue, even with os.dup().

If you don't do the flush() on a buffered file object, the unbuffered writes done by AsyncSSH will come in ahead of any prior buffered output which isn't flushed yet, with the buffered output eventually getting written some time later after the conn.run() completes and new output causes the buffer to fill (or when the buffered file object is closed). Unfortunately, AsyncSSH can't do the flush() automatically when it is given a buffered file object, as that call to flush() might block if the pipe is full.

Note that if recv_eof is set to True (the default), AsyncSSH may close the file before prior buffered output has a chance to be written. So, it's especially important for the application to make sure to do the flush() or to use only unbuffered writes to start with when redirects are used.

Regular files do use a blocking read/write, but the expectation is that any blocking time would be VERY short. However, blocking reads (or even blocking writes) on character devices like stdin/stdout/stderr can block for arbitrarily long waiting for more input or waiting for previous output to be consumed. In such cases, the entire asyncio event loop ends up being stalled, so blocking I/O on those is not really an option.

The stuff you saw around async file I/O is an optional "aiofiles" Python module which may or may not be installed. When it is, the caller can pass a file object opened by that module instead of passing a regular file, and that allows even the short delays which can be introduced when doing I/O on regular files to happen asynchronously. However, as you noted, it'd be up to the caller to decide whether they want to use "aiofiles" or not.

The only way to get non-blocking I/O on character devices, sockets, or other forms of pipes is to either use connect_read_pipe/connect_write_pipe or essentially reimplement it. It's possible aiofiles would work on pipes like this, but I don't know offhand if it does or not and I wouldn't make to make it a mandatory dependency. Also, "aiofiles" ends up using threads to handle blocking calls, which can actually be very expensive compared to the native asyncio implementation which keeps everything in a single thread.

More importantly, os.dup() won't make the issue around buffering any worse than it already is. As long as output is flushed in the calling code, everything should be good here, and if not the reordering problem was already present.

Note: The application may need to do the flush in an executor if you actually wrote enough data to potentially block the event loop before doing the redirect. That's something it will need to decide.

ronf avatar Feb 10 '24 18:02 ronf

I tried your repair and it works for the case I described at the beginning.

However, I found another problem caused by the connect_write_pipe/connect_read_pipe: the side effect of them is the underlying fd is set to non-blocking mode, which may cause print() to raise BlockingIOError: [Errno 11] write could not complete without blocking. The following code should reproduce it:

import asyncio as aio
import io
import os
import logging
import sys
from typing import IO, BinaryIO
from pathlib import Path

import synch as cash


class WrappedIO(io.IOBase):
    def __init__(self, wt: IO):
        self._wt = wt

    def write(self, b: bytes):
        return self._wt.write(b)

    def __getattr__(self, name: str):
        return getattr(self._wt, name)


async def main():
    # print("START!!!", end="")
    async with cash.connect("localhost", known_hosts=None) as conn:
        with open(Path("~/large.txt").expanduser(), "r") as input:
            await conn.run(
                "cat ~/large.txt",
                check=True,
                # stdin=sys.stdin,
                # stdin=input,
                # stdout=WrappedIO(sys.stdout.buffer),
                stdout=sys.stdout,
                recv_eof=False,
            )
            print(input.read())
    # print("END")
    # await aio.sleep(1)


with aio.Runner() as runner:
    logging.basic config(
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
        level=logging.DEBUG,
    )
    runner.run(main())

For the "output synchronization" problem, I reproduced it, too. I understand your concern, especially for sys. stdin, which may block for a long time (I tried to use blocking read/write on character devices, and it blocks the program on reading the stdin.)

According to my knowledge, blocking write on character devices should be ok (I'm not sure, but it doesn't seem to block for a long time like reading? Please tell me if I'm wrong). Therefore, I tried to use blocking write and non-blocking read for character devices. The result is the sys.stdout becomes non-blocking, too. It seems the non-blocking mode set on sys.stdin will also affect sys.stdout.

Another problem I met is that using a custom file-like object may make the problem more complicated. The WrappedIO in the above code is an example. asyncssh will regard it as a regular file because of the exception when calling the fileno(), so blocking write will be used. If we do not use stdin=sys.stdin, it will be fine, but if stdin=sys.stdin is used, the BlockingIOError will raise.

While these problems can be fixed in the calling code (including flush() before conn.run()), it's really inconvenient for users, and also bug-prone.

According to the above problems, I still think using the connect_write_pipe/connect_read_pipe is not a good idea.

I understand your concerns about performance and implementation. Therefore, I think this repair is ok, but the problems when using non-regular files should be documented, and a recommendation for aiofiles will be helpful (seems it also provide async version sys.stdin/sys.stdout).

Luffbee avatar Feb 11 '24 17:02 Luffbee

Thanks for the feedback!

It should be possible to "fix" the non-blocking issue by restoring the non-blocking state to what it was originally when the redirect is complete. This won't let you call print() or otherwise do I/O on something with an active redirect on it, but given buffering issues, that's not something that the redirect code was meant to support.

Blocking is actually an issue for both reading & writing at the file descriptor level, and the default output pipe can hold only a few KB of data typically before it will block. At the asyncio level, write() calls will buffer in RAM if the pipe is full, but if you want to avoid using arbitrary amounts of RAM for that buffering, you're supposed to await on drain() periodically, or stop forwarding data when you are told to pause writing, which the redirect code will do for you automatically.

The fact that you can't set blocking separately for stdin & stdout simultaneously is interesting. For the TTY case, I knew that was true for stdout & stderr as they are actually the same pipe, but I hadn't considered the possibility for the blocking to be shared even between reading & writing in that case. That does make this a bit messier if you want to only redirect one direction. That said, doing blocking I/O on a TTY (or pipe) in an async program is a really bad idea. To do this properly, you'd need to do an async open for any I/O to these, which would end up setting non-blocking. So, at that point, it would be ok to have a redirect in one direction and direct I/O in the other direction.

For "custom" I/O classes, I think your only real option is to make sure your read() method is async, to force AsyncSSH to treat it like the aiofiles case, and pay the overhead of a separate thread for each such redirect. In the end, that's always an option for any file-like object you want to redirect to. It's just not very scalable, and it kind of defeats the point of using AsyncSSH.

I also hadn't noticed before that aiofiles provided its own async version of stdin/stdout/stderr. That should make thing easier when you want to do async I/O on those without using read/write pipes, but I think it still makes sense to start with the more efficient case and let the caller decide when to use aiofiles. As long as you don't try to overlap redirects and direct reads/writes of the files you are redirecting and do flushes for any buffered I/O, the current read/write pipe solution should work just fine.

After I finish with some other work, I'll look into what can be done about resetting the blocking state when a redirect ends without closing a redirect target.

ronf avatar Feb 15 '24 02:02 ronf

I still need to do testing on this, but here's a first cut at resetting the blocking state of a redirect target when the redirect completes:

diff --git a/asyncssh/process.py b/asyncssh/process.py
index da3084e..4230f5a 100644
--- a/asyncssh/process.py
+++ b/asyncssh/process.py
@@ -354,11 +354,13 @@ class _PipeReader(_UnicodeReader[AnyStr], asyncio.BaseProtocol):
     """Forward data from a pipe"""

     def __init__(self, process: 'SSHProcess[AnyStr]', datatype: DataType,
-                 encoding: Optional[str], errors: str):
+                 fd: Optional[int], encoding: Optional[str], errors: str):
         super().__init__(encoding, errors)

         self._process: 'SSHProcess[AnyStr]' = process
         self._datatype = datatype
+        self._orig_fd = fd
+        self._orig_blocking = None if fd is not None else os.get_blocking(fd)
         self._transport: Optional[asyncio.ReadTransport] = None

     def connection_made(self, transport: asyncio.BaseTransport) -> None:
@@ -395,16 +397,21 @@ class _PipeReader(_UnicodeReader[AnyStr], asyncio.BaseProtocol):
         assert self._transport is not None
         self._transport.close()

+        if self._orig_fd is not None:
+            os.set_blocking(self._orig_fd, self._orig_blocking)
+

 class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):
     """Forward data to a pipe"""

     def __init__(self, process: 'SSHProcess[AnyStr]', datatype: DataType,
-                 encoding: Optional[str], errors: str):
+                 fd: Optional[int], encoding: Optional[str], errors: str):
         super().__init__(encoding, errors)

         self._process: 'SSHProcess[AnyStr]' = process
         self._datatype = datatype
+        self._orig_fd = fd
+        self._orig_blocking = None if fd is not None else os.get_blocking(fd)
         self._transport: Optional[asyncio.WriteTransport] = None
         self._close_event = asyncio.Event()

@@ -445,6 +452,10 @@ class _PipeWriter(_UnicodeWriter[AnyStr], asyncio.BaseProtocol):

         assert self._transport is not None
         self._transport.close()
+
+        if self._orig_fd is not None:
+            os.set_blocking(self._orig_fd, self._orig_blocking)
+
         self._process.add_cleanup_task(self._close_event.wait())


@@ -892,7 +903,8 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
         def pipe_factory() -> _PipeWriter:
             """Return a pipe write handler"""

-            return _PipeWriter(self, datatype, self._encoding, self._errors)
+            return _PipeWriter(self, datatype, None if recv_eof else fd,
+                               self._encoding, self._errors)

         if target == PIPE:
             writer: Optional[_WriterProtocol[AnyStr]] = None
@@ -940,9 +952,10 @@ class SSHProcess(SSHStreamSession, Generic[AnyStr]):
                     # If file was opened in text mode, remove that wrapper
                     file = cast(TextIO, target).buffer

+                fd = file.fileno()
+
                 if not recv_eof:
-                    fd = os.dup(cast(IO[bytes], file).fileno())
-                    file = os.fdopen(fd, 'wb', buffering=0)
+                    file = os.fdopen(fd, 'wb', buffering=0, closefd=False)

                 assert self._loop is not None
                 _, protocol = \

This also avoids using os.dup() on redirect targets when recv_eof is False, replacing it with closefd=False, since duplicating the fd was not enough to preserve the blocking state of the original fd. Setting non-blocking on either fd sets it for all fds associated with that target, including setting it for all of stdin/stdout/stderr if they are all associated with the same terminal even through they're all different fds.

I'm actually a bit worried about the last point. If there are multiple redirections active to the same device and they end in a different order than they were established, the wrong blocking state might be restored, since attempts to query the blocking state will begin to return non-blocking as the "original" state once the first redirect is started.

ronf avatar Mar 02 '24 21:03 ronf

Unfortunately, as I suspected, automatically saving and restoring the non-blocking status of an fd doesn't really work because of the way multiple fds can point at a single device.

It isn't really possible to support something like logging to stdout and redirect to stdout at the same time due to the differences in both blocking mode and buffering style. Also, trying to do blocking writes (such as what logging the print function does) isn't safe even if you avoid making such calls even without active redirects, as you'd still into the possibility of the read or write blocking, causing the entire asyncio event loop to stall.

Given that, I think the best answer here is to say that redirects of things like stdin/stdout/stderr can be supported, but only if previous buffered output is flushed. Applications would need to re-enable blocking on these targets after redirects are complete before using them again directly, but blocking I/O in general is not recommended because of the potential to block the event loop.

Based on this testing, I think I'm going to abandon the code above.

ronf avatar Mar 02 '24 22:03 ronf

I agree that saving and restoring the non-blocking state won't work.

However, I still think blocking write on sys.stdout is ok, as it usually won't block too long. Like you said logging/print in an async function has the same problem of blocking write on them. In my opinion, blocking read on sys.stdin is the real problem.

In my case, I use asyncssh to execute some scripts on remote hosts, and the performance bottleneck is the network latency. Maybe some people who use asyncssh in some other cases will care about the logging/print blocking, but I think it should be achieved by other ways like using aiofiles.

In this issue, I think the biggest problem is the design choice of modifying states of the stdin/stdout/stderr, which is unexpected, and bug-prone. The ideal solution for me is to wrap the sys.stdout into another thread like aiofiles (which I'm using now in my case), which is may need too much work or introduce additional dependencies. Therefore, my suggestion is that do not modify the stdin/stdout/stderr, and report a warning when blocking IOs are detected. It's also important to updating the document to tell users the behaviors, and suggesting them to use aiofiles when they need.

Luffbee avatar Mar 03 '24 13:03 Luffbee

The problem with sys.stdout is that the system WON'T block on it. Assuming both stdin and stdout are associated with the same terminal, setting stdin to non-blocking will make stdout non-blocking as well, and that will case output to be lost and an error to be returned to the caller if they try to write to stdout when its buffer is full. You can see this when you enable logging, where some of the output shows something like:

--- Logging error ---

Also, note that AsyncSSH is NOT what is setting these file descriptors to non-blocking, at least not directly. Whenever a caller passes an integer file descriptor or a file object associated with a character device (like a TTY) or a pipe as a redirect target, AsyncSSH has to use loop.connect_read_pipe() or loop.connect_write_pipe() to allow that device/pipe to be accessed via the asyncio event loop, and it is asyncio which changes these file descriptors to be non-blocking when that is done. AsyncSSH can't really even know in all cases that one of these targets is stdin/stdout/stderr, as it could be something like a file descriptor created via os.dup() or a different file object opened on the same terminal as what stdin/stdout/stderr are pointing at which won't compare as equal to sys.stdin/stdout/stderr.

One option here would be to disallow a caller from passing any TTY in as a redirect target, but I know people are using that successfully right now, and I wouldn't want to break that. In fact, it works fine to set stdin/stdout/stderr all to non-blocking, as long as all I/O from that point until the program exists is done through asyncio, and no longer done directly using things like print(). This means that this all works perfectly whenever there isn't a mix of async and non-async I/O going on at once, and when either recv_eof is True (closing the target when redirection completes), or when multiple redirects are done with recv_eof set to False but without any other non-async I/O in between them.

If you want to intermix AsyncSSH redirects and your own I/O to stdin/stdout/stderr, I agree that one approach is to go through aiofiles. It's more expensive, but it should allow you to avoid blocking the event loop. Alternately, if you actually go through asyncio's StreamReader and StreamWriter classes for the I/O you want to mix in, that should play nicely with other I/O such as the AsyncSSH redirects and not lead to either event loop blocking or BlockingIOError being raised.

ronf avatar Mar 09 '24 04:03 ronf

The problem with sys.stdout is that the system WON'T block on it.

I didn't make it clear. My suggestion is never to use loop.connect_read_pipe() or loop.connect_write_pipe() on any FD, and "let them block". The reason for this suggestion is that changing the states of these FD silently is not an expected behavior. And when bugs related to this behavior happen, it will be hard to debug. The way of "let them block" indeed may cause performance issues, but it is easier to understand what is happening, then fix the performance issues (if necessary) by other means like aiofiles or implementing StreamReader/StreamWriter.

One option here would be to disallow a caller from passing any TTY in as a redirect target, but I know people are using that successfully right now, and I wouldn't want to break that.

I suspect most of them do not know the actual behavior of using a TTY as a redirect target.

AsyncSSH can't really even know in all cases that one of these targets is stdin/stdout/stderr

For the warning, it should be reported when using any file objects that may cause blocking IO (e.g. character devices/pipes in blocking mode), not only sys.stdin/stdout/stderr.

My suggestion is based on my own understanding and “coding principles". It's ok that you do not accept it. I agree that this issue is hard to solve, and IMO the root cause is the lack of async read/write support for files in Python. If you decide to not change the behavior, it's better to document the behaviors of redirecting to special file objects.

Thanks for your efforts again. AsyncSSH is a great project, and it helped me a lot.

Luffbee avatar Mar 09 '24 18:03 Luffbee

I didn't make it clear. My suggestion is never to use loop.connect_read_pipe() or loop.connect_write_pipe() on any FD, and "let them block".

"Letting them block" is not really an option, for either reading or writing, particularly for pipes (which is what SSHProcess defaults to for all of its local stdin/stdout/stderr streams). Generally speaking, two different asyncio tasks will be doing the I/O on either side of the pipe (one reading and the other writing). If either task blocks on I/O, the asyncio event loop stops running, and the other task never runs, causing the program to deadlock. The writer will block waiting for the reader to make room in the buffer by reading, but the reader will never get scheduled. Alternately, if the reader blocks waiting for input, the task which is supposed to provide that input will never get scheduled, once again causing a deadlock.

I agree with what you're saying about it being unexpected that passing sys.stdin/stdout/stderr causing those to no longer be usable for synchronous I/O, and I can look into documenting that better. However, I don't really see a solution here other than to not mix synchronous & asynchronous I/O on the same target.

ronf avatar Mar 09 '24 22:03 ronf

Closing due to inactivity. Feel free to reopen this or open a new issue if you need anything else.

ronf avatar Jul 03 '24 15:07 ronf