rich icon indicating copy to clipboard operation
rich copied to clipboard

fix(`BrokenPipeError`): fixes #1591

Open lmmx opened this issue 1 year ago • 6 comments
trafficstars

Type of changes

  • [x] Bug fix
  • [ ] New feature
  • [ ] Documentation / docstrings
  • [ ] Tests
  • [ ] Other

Checklist

  • [x] I've run the latest black with default args on new code.
  • [x] I've updated CHANGELOG.md and CONTRIBUTORS.md where appropriate.
  • [x] I've added tests for new code.
  • [x] I accept that @willmcgugan may be pedantic in the code review.

Description

Fixes the following bug where piping to unix utils that truncate the output, e.g. head, raises an uncaught BrokenPipeError:

  • #1591

Please describe your changes here. If this fixes a bug, please link to the issue, if possible.

Verifying the fix:

broken_pipes_test.py

import rich

rich.print("hello")
rich.print("world")

Then invoke it: python broken_pipes_test.py | head -2 is OK but head -1 throws the error.


Current result

$ python broken_pipes_test.py | head -2; echo $?
hello
world
0
$ python broken_pipes_test.py | head -1; echo $?
hello
Traceback (most recent call last):
  File "/home/louis/lab/rich/broken_pipes_test.py", line 4, in <module>
    rich.print("world")
  File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/__init__.py", line 74, in print
    return write_console.print(*objects, sep=sep, end=end)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 1673, in print
    with self:
  File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 865, in __exit__
    self._exit_buffer()
  File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 823, in _exit_buffer
    self._check_buffer()
  File "/home/louis/miniconda3/lib/python3.11/site-packages/rich/console.py", line 2065, in _check_buffer
    self.file.flush()
BrokenPipeError: [Errno 32] Broken pipe
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
BrokenPipeError: [Errno 32] Broken pipe
0

PR result

$ python broken_pipes_test.py | head -2; echo $?
hello
world
0
$ python broken_pipes_test.py | head -1; echo $?
hello
0

Status

Incomplete:

  • [x] Confirmed existing test suite passes
  • [x] Add tests

lmmx avatar Dec 16 '23 13:12 lmmx

To test the function we need to be able to simulate head in a platform generic way (so we can't just call subprocess to invoke head).

Here are some utility functions for creating named pipes

import errno
import os
import sys
import threading
import time


def create_fifo(fifo_name):
    try:
        os.mkfifo(fifo_name)
    except FileExistsError:
        pass  # FIFO already exists


def remove_fifo(fifo_name):
    try:
        os.remove(fifo_name)
    except OSError:
        pass  # Handle error if needed


def read_from_fifo(fifo_name, num_lines):
    with open(fifo_name, "r") as fifo:
        for _ in range(num_lines):
            if fifo.readline() == "":
                break  # Exit if pipe is closed or EOF


def write_to_fifo_unsafely(fifo_name, num_lines):
    with open(fifo_name, "w") as fifo:
        for i in range(num_lines):
            print(f"Line {i}", file=fifo, flush=True)
            time.sleep(0.1)


def write_to_fifo_safely(fifo_name, num_lines):
    message = ""
    try:
        write_to_fifo_unsafely(fifo_name, num_lines)
    except BrokenPipeError:
        message = "Broken pipe encountered with BrokenPipeError."
    except OSError as e:
        if e.errno != errno.EPIPE:
            raise
        message = "Broken pipe encountered with OSError."
    return message

and here is a pytest test that confirms this approach raises a BrokenPipeError like the one we're trying to fix.

import tempfile
from threading import Thread

import pytest

from pytest_pipe_utils import (
    create_fifo,
    read_from_fifo,
    remove_fifo,
    write_to_fifo_safely,
    write_to_fifo_unsafely,
)


@pytest.fixture
def fifo_name():
    with tempfile.NamedTemporaryFile() as tmp:
        fifo_path = tmp.name
    create_fifo(fifo_path)
    yield fifo_path
    remove_fifo(fifo_path)


def test_write_to_fifo_safely(fifo_name):
    reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
    reader_thread.start()

    message = write_to_fifo_safely(fifo_name, 10)
    reader_thread.join()

    assert "Broken pipe encountered" in message


def test_write_to_fifo_unsafely(fifo_name):
    with pytest.raises(BrokenPipeError):
        reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
        reader_thread.start()

        message = write_to_fifo_unsafely(fifo_name, 10)
        reader_thread.join()

Results:

============================= test session starts ==============================
platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.0.0
rootdir: /home/louis/lab/pipe
collected 2 items

test_breaking_named_pipe_pytest.py ..                                    [100%]

============================== 2 passed in 1.01s ===============================

lmmx avatar Dec 16 '23 15:12 lmmx

To edit these tests for rich, we can just pass in a console and use its print statement instead.

pytest_rich_pipe_utils.py

import errno
import os
import sys
import threading
import time


def create_fifo(fifo_name):
    try:
        os.mkfifo(fifo_name)
    except FileExistsError:
        pass  # FIFO already exists


def remove_fifo(fifo_name):
    try:
        os.remove(fifo_name)
    except OSError:
        pass  # Handle error if needed


def read_from_fifo(fifo_name, num_lines):
    with open(fifo_name, "r") as fifo:
        for _ in range(num_lines):
            if fifo.readline() == "":
                break  # Exit if pipe is closed or EOF


def write_to_fifo_unsafely(console, fifo_name, num_lines):
    with open(fifo_name, "w") as fifo:
        console.file = fifo
        for i in range(num_lines):
            console.print(f"Line {i}")
            time.sleep(0.1)


def write_to_fifo_safely(console, fifo_name, num_lines):
    message = ""
    try:
        write_to_fifo_unsafely(console, fifo_name, num_lines)
    except BrokenPipeError:
        message = "Broken pipe encountered with BrokenPipeError."
    except OSError as e:
        if e.errno != errno.EPIPE:
            raise
        message = "Broken pipe encountered with OSError."
    return message

test_breaking_named_rich_pipe_pytest.py

import tempfile
from threading import Thread

import pytest
from rich.console import Console

from pytest_rich_pipe_utils import (
    create_fifo,
    read_from_fifo,
    remove_fifo,
    write_to_fifo_safely,
    write_to_fifo_unsafely,
)


@pytest.fixture
def fifo_name():
    with tempfile.NamedTemporaryFile() as tmp:
        fifo_path = tmp.name
    create_fifo(fifo_path)
    yield fifo_path
    remove_fifo(fifo_path)


@pytest.fixture
def console():
    """Create a Console instance"""
    return Console()


def test_write_to_fifo_safely(console, fifo_name):
    reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
    reader_thread.start()

    message = write_to_fifo_safely(console, fifo_name, 10)
    reader_thread.join()

    assert "Broken pipe encountered" in message


def test_write_to_fifo_unsafely(console, fifo_name):
    with pytest.raises(BrokenPipeError):
        reader_thread = Thread(target=read_from_fifo, args=(fifo_name, 5))
        reader_thread.start()

        message = write_to_fifo_unsafely(console, fifo_name, 10)
        reader_thread.join()

Result:

louis 🌟 ~/lab/pipe $ pytest test_breaking_named_rich_pipe_pytest.py 
================================= test session starts =================================
platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.0.0
rootdir: /home/louis/lab/pipe
collected 2 items                                                                     

test_breaking_named_rich_pipe_pytest.py ..                                      [100%]

================================== 2 passed in 1.04s ==================================

lmmx avatar Dec 16 '23 15:12 lmmx

Our test for this doesn't need to include the "safe" part to do with checking the message, pytest can handle the error raised (or not), so I'll just simplify it in the test I contribute now.

The test should therefore be just the example above but without the pytest.raises(BrokenPipeError) because we want that to be resolved.

Also, I won't use the fixture because it's not used in other tests.

lmmx avatar Dec 16 '23 15:12 lmmx

Well, 2 hours later and I don't think it's possible to reproduce a BrokenPipeError in pytest :expressionless:

It always gets captured and captured as a OSError: Bad file descriptor. I probably had a working demo of that much earlier :smiling_face_with_tear:

lmmx avatar Dec 16 '23 17:12 lmmx

Here is a test for regular print, not rich.console.Console.print. Maybe I need to mock something deeper in print to get it to work. When I swap the simple Console mock class here for the rich Console it gives the OSError: Bad file descriptor

6_minimal_repro_pytest_bug_multi_reraise.py

import socket
import threading
import time
from queue import Queue

import pytest


class Console:
    def __init__(self, file):
        self.file = file

    def print(self, msg):
        print(msg, file=self.file, flush=True)


def create_socket_pair():
    # Create a pair of connected sockets in blocking mode
    return socket.socketpair()


def socket_writer(sock, num_messages, exception_queue):
    file_like_socket = sock.makefile("w")
    console = Console(file=file_like_socket)

    try:
        for i in range(num_messages):
            message = f"Message {i}"
            console.print(message)  # Using console.print to write formatted message
            time.sleep(0.1)  # simulate time delay between messages
    except BrokenPipeError as exc:
        exception_queue.put(exc)
        print("Broken pipe detected in writer.")
    finally:
        try:
            file_like_socket.close()
        except BrokenPipeError as exc:
            exception_queue.put(exc)
            print("Broken pipe detected in writer shutdown.")
        finally:
            sock.close()


def socket_reader(sock, num_messages_to_read, exception_queue):
    try:
        for _ in range(num_messages_to_read):
            data = sock.recv(1024)
            if not data:
                break
    except BrokenPipeError as exc:
        exception_queue.put(exc)
        print("Broken pipe detected in reader.")
    finally:
        sock.close()


def test_console_socket_pair():
    # Creating socket pair
    sock_writer, sock_reader = create_socket_pair()
    excs = Queue()

    # Starting threads
    writer_thread = threading.Thread(target=socket_writer, args=(sock_writer, 20, excs))
    # Read only 10 messages
    reader_thread = threading.Thread(target=socket_reader, args=(sock_reader, 10, excs))

    with pytest.raises(BrokenPipeError):
        reader_thread.start()
        writer_thread.start()

        reader_thread.join()
        writer_thread.join()

        if not excs.empty():
            raise excs.get()

lmmx avatar Dec 16 '23 18:12 lmmx

Request for review @willmcgugan

Turned out it was basically impossible to capture this BrokenPipeError with pytest without resorting to subprocess :disappointed: Chalking that one up to a learning experience.

I decided to keep the status code as 1 (raise SystemExit(1)) to follow the Python stdlib docs, even though I feel it'd be more intuitive for the return code to be 0.

Doing it this way means the test will have to check its platform but I see you already do that for Windows. I added an early return to make it clear that the extra block of checks is purely there so as to "bail out" if the platform isn't right.

lmmx avatar Dec 16 '23 22:12 lmmx

I think this is a good step forward, but I'm uncertain if this is the fix (or at least all of it). The docs suggest that this can happen on writes as well as flushes.

It also seems disruptive for a library to explicitly exit the app.

Maybe there should be a method on Console that is called when a broken pip is received. The default behavior could raise an exception, but the dev could implement different behavior if desired.

Either way, it should be documented.

willmcgugan avatar Jul 01 '24 14:07 willmcgugan

At least, I have a working snippet of code that reliably recreates the problem in #3400.

egberts avatar Jul 02 '24 00:07 egberts

It would be nice to have more input on the desired behavior; I mostly agree with Will.

As for a subprocess-free way to generate a BrokenPipeError in Console.print; I can't test on Windows/macOS, but this should be similar to python script.py | head -n 0:

import os

import pytest
from rich.console import Console


def test_brokenpipeerror():
    r_pipe, w_pipe = os.pipe()
    console = Console(file=open(w_pipe, "w"))
    os.close(r_pipe)
    with pytest.raises(BrokenPipeError):
        console.print("This line causes a BrokenPipeError.")


if __name__ == "__main__":
    test_brokenpipeerror()

aidaco avatar Jul 02 '24 01:07 aidaco