pywinpty icon indicating copy to clipboard operation
pywinpty copied to clipboard

How to properly read until EOF using the WinPTY backend

Open ofek opened this issue 4 months ago • 4 comments

Script to reproduce:

import io
import os
import shutil
import signal
import sys
import tempfile
import threading
import time
from subprocess import list2cmdline

from winpty import PTY, Backend, WinptyError

READ_INTERVAL_SECONDS = 0.05


def capture(pty, writers, stop_event) -> None:
    while not stop_event.is_set():
        try:
            output = pty.read(io.DEFAULT_BUFFER_SIZE)
            if not output:
                time.sleep(READ_INTERVAL_SECONDS)
                continue

            for writer in writers:
                writer.write(output)
                writer.flush()
        except WinptyError:
            if pty.iseof():
                break

            continue


def main():
    command = list(sys.argv[1:])
    executable = command[0]
    if not os.path.isabs(executable):
        executable = shutil.which(executable)
        if not executable:
            raise FileNotFoundError(f"Executable not found: {executable}")

    args = command[1:]
    if args:
        args.insert(0, "")
        args = list2cmdline(args)
    else:
        args = None

    width, height = shutil.get_terminal_size()
    pty = PTY(width, height, backend=Backend.WinPTY)
    pty.spawn(list2cmdline([executable]), cmdline=args)

    with tempfile.SpooledTemporaryFile(mode="w+", encoding="utf-8", newline="") as out:
        event = threading.Event()
        thread = threading.Thread(target=capture, args=(pty, [sys.stdout, out], event), daemon=True)
        thread.start()
        try:
            while not pty.isalive():
                time.sleep(0.1)
        except KeyboardInterrupt:
            os.kill(pty.pid, signal.SIGTERM)
            raise
        finally:
            # time.sleep(3)
            event.set()
            thread.join()

            if (exit_code := pty.get_exitstatus()) is None:
                exit_code = 1

            # out.seek(0)
            # output = out.read()
            # print(repr(output))

    return exit_code


if __name__ == "__main__":
    sys.exit(main())

I need to read until completion but this logic, which appears to work using the ConPTY backend, has a race condition such that the while not pty.isalive(): main loop finishes before reading is complete and therefore the thread sees that the event is set and stops. Should I only set the event in the case of a keyboard interact then, and ConPTY was only working because using it was very slow before the forthcoming implementation?

ofek avatar Aug 02 '25 17:08 ofek

This should be fixed by #515

andfoy avatar Aug 11 '25 19:08 andfoy

Oh, nice work! When might the next release be?

ofek avatar Aug 11 '25 19:08 ofek

I'll make the new release today

andfoy avatar Aug 12 '25 16:08 andfoy

@ofek, your script has a minor bug, you should be checking for iseof as opposed to isalive, the latter ensures input after the process has ended, whereas the other when will flag once the process ends (even if input is ready to be read):

try:
    while not pty.iseof():
        time.sleep(0.1)
except KeyboardInterrupt:
    os.kill(pty.pid, signal.SIGTERM)
    raise

andfoy avatar Aug 12 '25 23:08 andfoy