uvloop icon indicating copy to clipboard operation
uvloop copied to clipboard

"Assertion `loop->watchers[w->fd] == w' failed." with multiprocessing and OS pipes

Open momocow opened this issue 5 years ago • 15 comments

Hi, I encountered the libuv assertion error while running multiprocessing and communicating with child processes via OS pipes.

I've prepared a minimal reproducible case.

I'm using kchmck/aiopipe to create pipes, which actually wraps file descriptors from os.pipe() into asyncio StreamReaders/StreamWrites via loop.connect_read_pipe() and loop.connect_write_pipe().

Here, I have decapsulated wrapper classes from aiopipe into functions in the aiopipe_decap.py for easier code tracing.

# aiopipe_decap.py

import asyncio
import os
from asyncio import (BaseTransport, StreamReader, StreamReaderProtocol,
                     StreamWriter, get_running_loop)
from contextlib import asynccontextmanager, contextmanager


async def _open_reader(fd):
    rx = StreamReader()
    transport, _ = await get_running_loop().connect_read_pipe(
        lambda: StreamReaderProtocol(rx),
        os.fdopen(fd))
    return transport, rx


async def _open_writer(fd):
    rx = StreamReader()
    transport, proto = await get_running_loop().connect_write_pipe(
        lambda: StreamReaderProtocol(rx),
        os.fdopen(fd, "w"))
    tx = StreamWriter(transport, proto, rx, get_running_loop())
    return transport, tx


@asynccontextmanager
async def open_stream(fd, mode):
    transport, stream = await _open_reader(fd) if mode == "r" \
        else await _open_writer(fd)

    try:
        yield stream
    finally:
        try:
            transport.close()
        except OSError:
            # The transport/protocol sometimes closes the fd before this is reached.
            pass

        # Allow event loop callbacks to run and handle closed transport.
        await asyncio.sleep(0)

@contextmanager
def detach(fd):
    os.set_inheritable(fd, True)
    try:
        yield
    finally:
        os.close(fd)

And here is the reproducible snippet, which creates a pipe connection between a child process and the master process. Once the child starts up, it will start a loop to send messages back to the master.

You can use the following environment variables to control some of the behaviors in this snippet.

  • U: enable uvloop if this variable is set (default: False)
  • R: number of messages to send (default: 1)
  • M: content of messages (default: "a")
# uv.py

import asyncio
import os
from multiprocessing import Process

from aiopipe_decap import *
import uvloop


async def child_task(fd, message, repeat):
    async with open_stream(fd, "w") as tx:
        for i in range(repeat):
            tx.write(message)
            await tx.drain()
        tx.write_eof()


def child_main(fd, message, repeat):
    asyncio.run(child_task(fd, message, repeat))


async def main(*, message=b"a", repeat=1):
    rfd, tfd = os.pipe()

    with detach(tfd):
        proc = Process(target=child_main, args=(tfd, message, repeat))
        proc.start()

    count = 0
    async with open_stream(rfd, "r") as rx:
        while True:
            msg = await rx.read(1)
            if not msg:
                break
            count += 1
            assert msg == message
    assert count == repeat


if __name__ == "__main__":
    if os.getenv("U", ""):
        uvloop.install()
    rp = int(os.getenv("R", "1"))
    msg = os.getenv("M", "a").encode()
    asyncio.run(main(message=msg, repeat=rp))

Here's my result on my NAS (in Ubuntu 16.04 container). I found the assertion error related to the number of repeated times, the more the number is, the more chance to trigger the error. In the log shows that 9216 is a magic number but I doubt it's depending on different environments.

(venv) $ U=1 R=9210 python uv.py
(venv) $ U=1 R=9211 python uv.py
(venv) $ U=1 R=9212 python uv.py
(venv) $ U=1 R=9213 python uv.py
(venv) $ U=1 R=9214 python uv.py
(venv) $ U=1 R=9215 python uv.py
(venv) $ U=1 R=9216 python uv.py
(venv) $ U=1 R=9217 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9218 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9219 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError
(venv) $ U=1 R=9220 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 85, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 77, in main
    assert count == repeat
AssertionError

It works like a charm with vanilla asyncio.

(venv) $ R=9210 python uv.py
(venv) $ R=9211 python uv.py
(venv) $ R=9212 python uv.py
(venv) $ R=9213 python uv.py
(venv) $ R=9214 python uv.py
(venv) $ R=9215 python uv.py
(venv) $ R=9216 python uv.py
(venv) $ R=9217 python uv.py
(venv) $ R=9218 python uv.py
(venv) $ R=9219 python uv.py
(venv) $ R=9220 python uv.py

With even larger number of repeated times.

  • R=100K
(venv) $ R=100000 python uv.py
(venv) $ U=1 R=100000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • R=1M
(venv) $ R=1000000 python uv.py
(venv) $ U=1 R=1000000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • uvloop version: 0.14.0
  • Python version: Python 3.7.3
  • Platform: Linux-4.2.8-x86_64-with-debian-stretch-sid'
  • Can you reproduce the bug with PYTHONASYNCIODEBUG in env?: Yes
(venv) $ PYTHONASYNCIODEBUG=1 U=1 R=100000 python uv.py
python: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
Traceback (most recent call last):
  File "uv.py", line 44, in <module>
    asyncio.run(main(message=msg, repeat=rp))
  File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1456, in uvloop.loop.Loop.run_until_complete
  File "uv.py", line 36, in main
    assert count == repeat
AssertionError
  • Does uvloop behave differently from vanilla asyncio? How?: Vanilla asyncio works like a charm while uvloop raise the assertion error.

momocow avatar Feb 20 '20 03:02 momocow

Thanks so much! I'll take a look in a few weeks when it's time for uvloop 0.15. If anyone wants to further debug & work on fix please go ahead!

1st1 avatar Feb 20 '20 04:02 1st1

This seems to be a pretty serious issue - when the buffer is full writing to a pipe, this is 100% reproducible:

import asyncio
from asyncio.subprocess import PIPE

import uvloop


async def main():
    proc = await asyncio.create_subprocess_shell('sleep 3600', stdin=PIPE)
    while True:
        proc.stdin.write(b'x' * 32768)
        await proc.stdin.drain()


if __name__ == '__main__':
    uvloop.install()
    asyncio.run(main())

The error is the same:

Assertion failed: (loop->watchers[w->fd] == w), function uv__io_stop, file src/unix/core.c, line 932.
Abort trap: 6

Before we have a proper fix from libuv (I'll create a PR in libuv when time, refs libuv/libuv#2058), we can easily fix uvloop by partially reverting d8fe153e5b67de1124b828f9c9b3c5397ac68d5d and adding this patch:

diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx
index 581554f..7a2c8ec 100644
--- a/uvloop/handles/pipe.pyx
+++ b/uvloop/handles/pipe.pyx
@@ -12,6 +12,7 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
     err = uv.uv_pipe_init(handle._loop.uvloop,
                           <uv.uv_pipe_t*>handle._handle,
                           0)
+    # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
+    # even if it is O_WRONLY, see also #317, libuv/libuv#2058
+    handle._handle.flags |= 0x00004000
     if err < 0:
         handle._abort_init()
         raise convert_error(err)
diff --git a/uvloop/includes/uv.pxd b/uvloop/includes/uv.pxd
index 5f034b3..9efdab1 100644
--- a/uvloop/includes/uv.pxd
+++ b/uvloop/includes/uv.pxd
@@ -82,6 +82,7 @@ cdef extern from "uv.h" nogil:
     ctypedef struct uv_handle_t:
         void* data
         uv_loop_t* loop
+        unsigned int flags
         # ...

     ctypedef struct uv_idle_t:

But UV_HANDLE_READABLE is an internal flag in uv-common.h, so using the magic number here is a bit dirty - @1st1 shall I create a PR for this patch in uvloop with proper tests, or do you prefer to wait for a new version of libuv?

fantix avatar Apr 19 '20 22:04 fantix

I also hit this error— I have a fairly generic asyncio application which downloads a bunch of tarballs in parallel with httpx.stream and pipes the content into tar subprocesses which are unpacking them onto the filesystem:

        async with self.stream_repo_tarball() as tarball_stream:
            tar_proc = await asyncio.create_subprocess_exec('tar', '--extract', '--verbose',
                    '--gzip', '--strip-components=1', cwd=path, stdin=asyncio.subprocess.PIPE)

            async for chunk in tarball_stream.aiter_bytes():
                tar_proc.stdin.write(chunk)
                await tar_proc.stdin.drain()
            tar_proc.stdin.close()
            await tar_proc.wait()

This works under asyncio, but uvloop crashes pretty reliably with:

python3: src/unix/core.c:932: uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.

If it's relevant, this is in the context of a Sanic webserver, so I believe there is a second worker thread running the loop. For now I am disabling uvloop and then it is fine again:

asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

mikepurvis avatar Sep 10 '20 01:09 mikepurvis

@fantix have we fixed this one in master?

1st1 avatar Sep 10 '20 19:09 1st1

just ran into this as well... what's the current status? is it fixed in master already? if so, when can we expect a release?

enumag avatar Oct 19 '20 06:10 enumag

Critical issue as well

rgeronimi avatar Nov 16 '20 14:11 rgeronimi

@fantix It seems that we now just need to bump libuv to resolve this?

1st1 avatar Nov 16 '20 16:11 1st1

@1st1 sry for the late reply! Let me try the latest libuv 1.39.0

fantix avatar Nov 25 '20 19:11 fantix

#342 should've already fixed the crashing (using libuv internal flags as a workaround), while there isn't a proper fix from the upstream yet.

However, I think this issue revealed another pipe bug in uvloop (master): the pipe is closed before all data is transmitted. I've simplified @momocow 's script to make it work for also Python > 3.7:

import asyncio
import os
from asyncio import StreamReader, StreamReaderProtocol, StreamWriter, get_running_loop
from contextlib import asynccontextmanager


async def _open_reader(fd):
    rx = StreamReader()
    transport, _ = await get_running_loop().connect_read_pipe(
        lambda: StreamReaderProtocol(rx), os.fdopen(fd)
    )
    return transport, rx


async def _open_writer(fd):
    rx = StreamReader()
    transport, proto = await get_running_loop().connect_write_pipe(
        lambda: StreamReaderProtocol(rx), os.fdopen(fd, "w")
    )
    tx = StreamWriter(transport, proto, rx, get_running_loop())
    return transport, tx


@asynccontextmanager
async def open_stream(fd, mode):
    transport, stream = (
        await _open_reader(fd) if mode == "r" else await _open_writer(fd)
    )

    try:
        yield stream
    finally:
        try:
            transport.close()
        except OSError:
            # The transport/protocol sometimes closes the fd before this is reached.
            pass

        # Allow event loop callbacks to run and handle closed transport.
        await asyncio.sleep(0)


async def child_task(fd, message, repeat):
    async with open_stream(fd, "w") as tx:
        for i in range(repeat):
            tx.write(message)
            await tx.drain()
        tx.write_eof()


async def parent_task(rfd, message, repeat):
    count = 0
    async with open_stream(rfd, "r") as rx:
        while True:
            msg = await rx.read(1)
            if not msg:
                break
            count += 1
            assert msg == message
    assert count == repeat, f"{count} != {repeat}"


def main():
    repeat = int(os.getenv("R", "1"))
    message = os.getenv("M", "a").encode()
    rfd, tfd = os.pipe()
    pid = os.fork()
    if os.getenv("U", ""):
        import uvloop

        uvloop.install()

    if pid:
        os.close(rfd)
        asyncio.run(child_task(tfd, message, repeat))
    else:
        os.close(tfd)
        asyncio.run(parent_task(rfd, message, repeat))


if __name__ == "__main__":
    main()

And it fails like:

% U=1 R=9217 python uv.py
Traceback (most recent call last):
  File "uv.py", line 82, in <module>
    main()
  File "uv.py", line 78, in main
    asyncio.run(parent_task(rfd, message, repeat))
  File "asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "uvloop/loop.pyx", line 1477, in uvloop.loop.Loop.run_until_complete
    return future.result()
  File "uv.py", line 60, in parent_task
    assert count == repeat, f"{count} != {repeat}"
AssertionError: 9216 != 9217

It is reproducible for me (macOS 11 on Intel CPU) on Python 3.7, 3.8 and 3.9, with libuv 1.33.1 or 1.39.0. I'll investigate a bit first.

fantix avatar Nov 26 '20 18:11 fantix

Ah okay, this is not a uvloop bug, but rather a difference to asyncio: you must await tx.wait_closed() in order to wait for all the data to be transmitted, while in asyncio it may still succeed even if you don't await on anything (this is for sure not recommended). Therefore, the following child_task should work for both asyncio and uvloop (master):

async def child_task(fd, message, repeat):
    _, tx = await _open_writer(fd)
    for i in range(repeat):
        tx.write(message)
        await tx.drain()
    tx.write_eof()
    tx.close()
    await tx.wait_closed()

I think we can now call this issue (#317) fixed.

fantix avatar Nov 26 '20 18:11 fantix

Ah okay, this is not a uvloop bug, but rather a difference to asyncio: you must await tx.wait_closed() in order to wait for all the data to be transmitted,

Maybe we can throw our error somehow instead of letting libuv to crash with an asserion error?

1st1 avatar Dec 01 '20 21:12 1st1

Oh, we've fixed the crash, the missing await tx.wait_closed() is only causing data not fully received.

fantix avatar Dec 01 '20 21:12 fantix

I've also created an upstream PR so that we don't have to hack in the private constant in uvloop.

fantix avatar Dec 01 '20 21:12 fantix

So is this bug already fixed now, and if we use the latest version 0.15.2, does it means we won't hit this issue again? Thx. @fantix

PJovy avatar Apr 21 '21 00:04 PJovy

@PJovy right, the temporary fix is included in 0.15. An ideal fix would depend on something from upstream, but we should be fine without it.

fantix avatar Apr 21 '21 14:04 fantix

We'll probably live with this temporary fix for now, as the upstream doesn't have a decision yet. I'm closing this issue to avoid confusion.

fantix avatar Sep 09 '22 15:09 fantix