uvloop icon indicating copy to clipboard operation
uvloop copied to clipboard

Asynchronous multiprocessing.Pipe reads throw when using uvloop

Open judilsteve opened this issue 4 months ago • 0 comments

Summary

When using an asyncio.Event() with loop.add_reader() to asynchronously read from a multiprocessing.Pipe, the call to recv_bytes() throws:

Process reader:
Traceback (most recent call last):
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/james/code/connection_test.py", line 25, in reader_wrapper
    uvloop.run(async_reader(read_pipe))
    ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/james/code/.venv/lib/python3.13/site-packages/uvloop/__init__.py", line 109, in run
    return __asyncio.run(
           ~~~~~~~~~~~~~^
        wrapper(),
        ^^^^^^^^^^
    ...<2 lines>...
        **run_kwargs
        ^^^^^^^^^^^^
    )
    ^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "uvloop/loop.pyx", line 1518, in uvloop.loop.Loop.run_until_complete
  File "/home/james/code/.venv/lib/python3.13/site-packages/uvloop/__init__.py", line 61, in wrapper
    return await main
           ^^^^^^^^^^
  File "/home/james/code/connection_test.py", line 20, in async_reader
    message = read_pipe.recv_bytes()
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 437, in _recv_bytes
    return self._recv(size)
           ~~~~~~~~~~^^^^^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 395, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable

This does not occur when running with asyncio

Environment

Python: 3.13.5
uvloop: 0.21.0

Reproduction

import asyncio
import uvloop
from multiprocessing import Process
from multiprocessing.connection import Connection, Pipe, wait

def writer(write_pipe: Connection):
    message = b'!' * 65536
    while True:
        write_pipe.send_bytes(message)

async def async_reader(read_pipe: Connection):
    message_available = asyncio.Event()
    asyncio.get_running_loop().add_reader(
        read_pipe.fileno(), message_available.set
    )

    while True:
        await message_available.wait()
        while read_pipe.poll():
            message = read_pipe.recv_bytes()
            print('Got message')
        message_available.clear()

def reader_wrapper(read_pipe: Connection):
    uvloop.run(async_reader(read_pipe))

if __name__ == '__main__':
    read_pipe, write_pipe = Pipe(duplex=False)

    reader_process = Process(
        name="reader", target=reader_wrapper, args=(read_pipe,)
    )
    reader_process.start()

    writer_process = Process(
        name="writer", target=writer, args=(write_pipe,)
    )
    writer_process.start()

    wait([reader_process.sentinel, writer_process.sentinel])

judilsteve avatar Aug 15 '25 02:08 judilsteve