rich
rich copied to clipboard
fix(`BrokenPipeError`): fixes #1591
Type of changes
- [x] Bug fix
- [ ] New feature
- [ ] Documentation / docstrings
- [ ] Tests
- [ ] Other
Checklist
- [x] I've run the latest black with default args on new code.
- [x] I've updated CHANGELOG.md and CONTRIBUTORS.md where appropriate.
- [x] I've added tests for new code.
- [x] I accept that @willmcgugan may be pedantic in the code review.
Description
Fixes the following bug where piping to unix utils that truncate the output, e.g. head, raises an uncaught BrokenPipeError:
- #1591
Please describe your changes here. If this fixes a bug, please link to the issue, if possible.
Verifying the fix:
broken_pipes_test.py
import rich
rich.print("hello")
rich.print("world")
Then invoke it: python broken_pipes_test.py | head -2 is OK but head -1 throws the error.
Current result
$ python broken_pipes_test.py | head -2; echo $?
hello
world
0
$ python broken_pipes_test.py | head -1; echo $?
hello
Traceback (most recent call last):
File "/home/louis/lab/rich/broken_pipes_test.py", line 4, in <module>
rich.print("world")
File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/__init__.py", line 74, in print
return write_console.print(*objects, sep=sep, end=end)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 1673, in print
with self:
File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 865, in __exit__
self._exit_buffer()
File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 823, in _exit_buffer
self._check_buffer()
File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 2065, in _check_buffer
self.file.flush()
BrokenPipeError: [Errno 32] Broken pipe
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
BrokenPipeError: [Errno 32] Broken pipe
0
PR result
$ python broken_pipes_test.py | head -2; echo $?
hello
world
0
$ python broken_pipes_test.py | head -1; echo $?
hello
0
Status
Incomplete:
- [x] Confirmed existing test suite passes
- [x] Add tests
To test the function we need to be able to simulate head in a platform generic way (so we can't just call subprocess to invoke head).
Here are some utility functions for creating named pipes
import errno
import os
import sys
import threading
import time
def create_fifo(fifo_name):
try:
os.mkfifo(fifo_name)
except FileExistsError:
pass # FIFO already exists
def remove_fifo(fifo_name):
try:
os.remove(fifo_name)
except OSError:
pass # Handle error if needed
def read_from_fifo(fifo_name, num_lines):
with open(fifo_name, "r") as fifo:
for _ in range(num_lines):
if fifo.readline() == "":
break # Exit if pipe is closed or EOF
def write_to_fifo_unsafely(fifo_name, num_lines):
with open(fifo_name, "w") as fifo:
for i in range(num_lines):
print(f"Line {i}", file=fifo, flush=True)
time.sleep(0.1)
def write_to_fifo_safely(fifo_name, num_lines):
message = ""
try:
write_to_fifo_unsafely(fifo_name, num_lines)
except BrokenPipeError:
message = "Broken pipe encountered with BrokenPipeError."
except OSError as e:
if e.errno != errno.EPIPE:
raise
message = "Broken pipe encountered with OSError."
return message
and here is a pytest test that confirms this approach raises a BrokenPipeError like the one we're trying to fix.
import tempfile
from threading import Thread
import pytest
from pytest_pipe_utils import (
create_fifo,
read_from_fifo,
remove_fifo,
write_to_fifo_safely,
write_to_fifo_unsafely,
)
@pytest.fixture
def fifo_name():
with tempfile.NamedTemporaryFile() as tmp:
fifo_path = tmp.name
create_fifo(fifo_path)
yield fifo_path
remove_fifo(fifo_path)
def test_write_to_fifo_safely(fifo_name):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_safely(fifo_name, 10)
reader_thread.join()
assert "Broken pipe encountered" in message
def test_write_to_fifo_unsafely(fifo_name):
with pytest.raises(BrokenPipeError):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_unsafely(fifo_name, 10)
reader_thread.join()
Results:
============================= test session starts ==============================
platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.0.0
rootdir: /home/louis/lab/pipe
collected 2 items
test_breaking_named_pipe_pytest.py .. [100%]
============================== 2 passed in 1.01s ===============================
To edit these tests for rich, we can just pass in a console and use its print statement instead.
pytest_rich_pipe_utils.py
import errno
import os
import sys
import threading
import time
def create_fifo(fifo_name):
try:
os.mkfifo(fifo_name)
except FileExistsError:
pass # FIFO already exists
def remove_fifo(fifo_name):
try:
os.remove(fifo_name)
except OSError:
pass # Handle error if needed
def read_from_fifo(fifo_name, num_lines):
with open(fifo_name, "r") as fifo:
for _ in range(num_lines):
if fifo.readline() == "":
break # Exit if pipe is closed or EOF
def write_to_fifo_unsafely(console, fifo_name, num_lines):
with open(fifo_name, "w") as fifo:
console.file = fifo
for i in range(num_lines):
console.print(f"Line {i}")
time.sleep(0.1)
def write_to_fifo_safely(console, fifo_name, num_lines):
message = ""
try:
write_to_fifo_unsafely(console, fifo_name, num_lines)
except BrokenPipeError:
message = "Broken pipe encountered with BrokenPipeError."
except OSError as e:
if e.errno != errno.EPIPE:
raise
message = "Broken pipe encountered with OSError."
return message
test_breaking_named_rich_pipe_pytest.py
import tempfile
from threading import Thread
import pytest
from rich.console import Console
from pytest_rich_pipe_utils import (
create_fifo,
read_from_fifo,
remove_fifo,
write_to_fifo_safely,
write_to_fifo_unsafely,
)
@pytest.fixture
def fifo_name():
with tempfile.NamedTemporaryFile() as tmp:
fifo_path = tmp.name
create_fifo(fifo_path)
yield fifo_path
remove_fifo(fifo_path)
@pytest.fixture
def console():
"""Create a Console instance"""
return Console()
def test_write_to_fifo_safely(console, fifo_name):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_safely(console, fifo_name, 10)
reader_thread.join()
assert "Broken pipe encountered" in message
def test_write_to_fifo_unsafely(console, fifo_name):
with pytest.raises(BrokenPipeError):
reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
reader_thread.start()
message = write_to_fifo_unsafely(console, fifo_name, 10)
reader_thread.join()
Result:
louis 🌟 ~/lab/pipe $ pytest test_breaking_named_rich_pipe_pytest.py
================================= test session starts =================================
platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.0.0
rootdir: /home/louis/lab/pipe
collected 2 items
test_breaking_named_rich_pipe_pytest.py .. [100%]
================================== 2 passed in 1.04s ==================================
Our test for this doesn't need to include the "safe" part to do with checking the message, pytest can handle the error raised (or not), so I'll just simplify it in the test I contribute now.
The test should therefore be just the example above but without the pytest.raises(BrokenPipeError) because we want that to be resolved.
Also, I won't use the fixture because it's not used in other tests.
Well, 2 hours later and I don't think it's possible to reproduce a BrokenPipeError in pytest :expressionless:
It always gets captured and captured as a OSError: Bad file descriptor. I probably had a working demo of that much earlier :smiling_face_with_tear:
Here is a test for regular print, not rich.console.Console.print. Maybe I need to mock something deeper in print to get it to work. When I swap the simple Console mock class here for the rich Console it gives the OSError: Bad file descriptor
6_minimal_repro_pytest_bug_multi_reraise.py
import socket
import threading
import time
from queue import Queue
import pytest
class Console:
def __init__(self, file):
self.file = file
def print(self, msg):
print(msg, file=self.file, flush=True)
def create_socket_pair():
# Create a pair of connected sockets in blocking mode
return socket.socketpair()
def socket_writer(sock, num_messages, exception_queue):
file_like_socket = sock.makefile("w")
console = Console(file=file_like_socket)
try:
for i in range(num_messages):
message = f"Message {i}"
console.print(message) # Using console.print to write formatted message
time.sleep(0.1) # simulate time delay between messages
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in writer.")
finally:
try:
file_like_socket.close()
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in writer shutdown.")
finally:
sock.close()
def socket_reader(sock, num_messages_to_read, exception_queue):
try:
for _ in range(num_messages_to_read):
data = sock.recv(1024)
if not data:
break
except BrokenPipeError as exc:
exception_queue.put(exc)
print("Broken pipe detected in reader.")
finally:
sock.close()
def test_console_socket_pair():
# Creating socket pair
sock_writer, sock_reader = create_socket_pair()
excs = Queue()
# Starting threads
writer_thread = threading.Thread(target=socket_writer, args=(sock_writer, 20, excs))
# Read only 10 messages
reader_thread = threading.Thread(target=socket_reader, args=(sock_reader, 10, excs))
with pytest.raises(BrokenPipeError):
reader_thread.start()
writer_thread.start()
reader_thread.join()
writer_thread.join()
if not excs.empty():
raise excs.get()
Request for review @willmcgugan
Turned out it was basically impossible to capture this BrokenPipeError with pytest without resorting to subprocess :disappointed: Chalking that one up to a learning experience.
I decided to keep the status code as 1 (raise SystemExit(1)) to follow the Python stdlib docs, even though I feel it'd be more intuitive for the return code to be 0.
Doing it this way means the test will have to check its platform but I see you already do that for Windows. I added an early return to make it clear that the extra block of checks is purely there so as to "bail out" if the platform isn't right.
I think this is a good step forward, but I'm uncertain if this is the fix (or at least all of it). The docs suggest that this can happen on writes as well as flushes.
It also seems disruptive for a library to explicitly exit the app.
Maybe there should be a method on Console that is called when a broken pip is received. The default behavior could raise an exception, but the dev could implement different behavior if desired.
Either way, it should be documented.
At least, I have a working snippet of code that reliably recreates the problem in #3400.
It would be nice to have more input on the desired behavior; I mostly agree with Will.
As for a subprocess-free way to generate a BrokenPipeError in Console.print; I can't test on Windows/macOS, but this should be similar to python script.py | head -n 0:
import os
import pytest
from rich.console import Console
def test_brokenpipeerror():
r_pipe, w_pipe = os.pipe()
console = Console(file=open(w_pipe, "w"))
os.close(r_pipe)
with pytest.raises(BrokenPipeError):
console.print("This line causes a BrokenPipeError.")
if __name__ == "__main__":
test_brokenpipeerror()