How to properly read until EOF using the WinPTY backend
Script to reproduce:
import io
import os
import shutil
import signal
import sys
import tempfile
import threading
import time
from subprocess import list2cmdline
from winpty import PTY, Backend, WinptyError
READ_INTERVAL_SECONDS = 0.05
def capture(pty, writers, stop_event) -> None:
while not stop_event.is_set():
try:
output = pty.read(io.DEFAULT_BUFFER_SIZE)
if not output:
time.sleep(READ_INTERVAL_SECONDS)
continue
for writer in writers:
writer.write(output)
writer.flush()
except WinptyError:
if pty.iseof():
break
continue
def main():
command = list(sys.argv[1:])
executable = command[0]
if not os.path.isabs(executable):
executable = shutil.which(executable)
if not executable:
raise FileNotFoundError(f"Executable not found: {executable}")
args = command[1:]
if args:
args.insert(0, "")
args = list2cmdline(args)
else:
args = None
width, height = shutil.get_terminal_size()
pty = PTY(width, height, backend=Backend.WinPTY)
pty.spawn(list2cmdline([executable]), cmdline=args)
with tempfile.SpooledTemporaryFile(mode="w+", encoding="utf-8", newline="") as out:
event = threading.Event()
thread = threading.Thread(target=capture, args=(pty, [sys.stdout, out], event), daemon=True)
thread.start()
try:
while not pty.isalive():
time.sleep(0.1)
except KeyboardInterrupt:
os.kill(pty.pid, signal.SIGTERM)
raise
finally:
# time.sleep(3)
event.set()
thread.join()
if (exit_code := pty.get_exitstatus()) is None:
exit_code = 1
# out.seek(0)
# output = out.read()
# print(repr(output))
return exit_code
if __name__ == "__main__":
sys.exit(main())
I need to read until completion but this logic, which appears to work using the ConPTY backend, has a race condition such that the while not pty.isalive(): main loop finishes before reading is complete and therefore the thread sees that the event is set and stops. Should I only set the event in the case of a keyboard interact then, and ConPTY was only working because using it was very slow before the forthcoming implementation?
This should be fixed by #515
Oh, nice work! When might the next release be?
I'll make the new release today
@ofek, your script has a minor bug, you should be checking for iseof as opposed to isalive, the latter ensures input after the process has ended, whereas the other when will flag once the process ends (even if input is ready to be read):
try:
while not pty.iseof():
time.sleep(0.1)
except KeyboardInterrupt:
os.kill(pty.pid, signal.SIGTERM)
raise