cpython icon indicating copy to clipboard operation
cpython copied to clipboard

multiprocessing.Queue.get(block=False) can raise queue.Empty on non-empty queue

Open 0acbaada-cdd2-4efe-82bc-1f2d975bdb9b opened this issue 4 years ago • 9 comments

BPO 43136
Nosy @sbraz, @JulienPalard
Files
  • queue_test.py
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = None
    created_at = <Date 2021-02-05.12:03:06.227>
    labels = ['type-bug', '3.8', '3.9', '3.10', '3.7', 'library']
    title = 'multiprocessing.Queue.get(block=False) can raise queue.Empty on non-empty queue'
    updated_at = <Date 2021-02-05.15:20:21.144>
    user = 'https://github.com/sbraz'
    

    bugs.python.org fields:

    activity = <Date 2021-02-05.15:20:21.144>
    actor = 'mdk'
    assignee = 'none'
    closed = False
    closed_date = None
    closer = None
    components = ['Library (Lib)']
    creation = <Date 2021-02-05.12:03:06.227>
    creator = 'sbraz'
    dependencies = []
    files = ['49791']
    hgrepos = []
    issue_num = 43136
    keywords = []
    message_count = 2.0
    messages = ['386525', '386528']
    nosy_count = 2.0
    nosy_names = ['sbraz', 'mdk']
    pr_nums = []
    priority = 'normal'
    resolution = None
    stage = None
    status = 'open'
    superseder = None
    type = 'behavior'
    url = 'https://bugs.python.org/issue43136'
    versions = ['Python 3.6', 'Python 3.7', 'Python 3.8', 'Python 3.9', 'Python 3.10']
    

    Hello, I have noticed that when multiple processes try to simultaneously get items from a multiprocessing queue with block=False, it will raise queue.Empty even though there are still items in it. Adding a multiprocessing lock around calls to Queue.get fixes the problem.

    Please consider the attached reproducer script and its output: $ ./queue_test.py using processes 2021-02-05T12:48:21.742728 worker 0 got 0, queue size was 100 2021-02-05T12:48:21.743702 worker 1 got 1, queue size was 99 2021-02-05T12:48:21.744059 worker 2 got 2, queue size was 98 2021-02-05T12:48:21.745352 worker 3 got 3, queue size was 97 2021-02-05T12:48:22.743905 worker 1 queue is EMPTY, size was 96 2021-02-05T12:48:22.744064 worker 0 got 4, queue size was 96 2021-02-05T12:48:22.746525 worker 3 queue is EMPTY, size was 95 2021-02-05T12:48:22.749573 worker 2 got 5, queue size was 95 2021-02-05T12:48:23.744474 worker 0 got 6, queue size was 94 2021-02-05T12:48:23.750728 worker 2 got 7, queue size was 93 2021-02-05T12:48:24.745852 worker 0 got 8, queue size was 92 2021-02-05T12:48:24.751827 worker 2 got 9, queue size was 91 […]

    I have been able to reproduce this problem with Python 2.7 and 3.5 through 3.9 (3.10 untested).

    When using threads and queue.Queue instead of their multiprocessing counterparts, the problem is not present ("./queue_test.py thread" → no spurious exceptions until the queue is actually empty).

    I can reproduce it in 3.10 too.

    JulienPalard avatar Feb 05 '21 15:02 JulienPalard

    I have the same issue but with block=True and timeout=1

    dmenig avatar Aug 12 '22 14:08 dmenig

    I can also reproduce this with block=False

    CorvetteCole avatar Feb 22 '23 21:02 CorvetteCole

    same problem too. queue.qsize() return 50 but queue.get(False) raise queue.Empty. Not sure because queue.put() is async or other reasons?

    dangnh0611 avatar Apr 03 '25 12:04 dangnh0611

    Yup - same behaviour. qsize returns items in the queue, but get.no_wait raises queue.Empty.

    Python 3.12

    My workaround for this is to block, with a smaill delay in a loop, then it seems to work, where it only hits the except Empty a couple of times

            while True:
                try:
                    queue.get(timeout=0.05)
                except Empty:
                    pass
    
                    size = queue.qsize()
    
                    log.info("Queue length: %s", size)
    
                    if not size:
                        break
    

    MorningLightMountain713 avatar Apr 15 '25 09:04 MorningLightMountain713

    The problem is too simple (and yes, it is also reproducible on any version of Python regardless of platform, and now I will explain why). If we add traceback.print_exc() to the except block, we will see that the cause in the implementation is the following block ("rlock" is read as "read lock"; the name is somewhat ambiguous):

    if not self._rlock.acquire(block, timeout):
        raise Empty
    

    The meaning is as follows: if someone is already performing the get operation, and because of this we did not have time to acquire the lock to ensure exclusive access, we raise queue.Empty. In general, this is usually desirable behavior for non-blocking operations, but the current exception is misleading, and neither the queue module nor even the threading module do this (all underlying lock acquisitions in all their primitives are unconditionally blocking).

    x42005e1f avatar Dec 16 '25 10:12 x42005e1f

    One solution would be to add a second semaphore that would synchronize get operations (similar to the one that synchronizes put operations). Its initial value would be zero. The put operation releases the semaphore, and the get operation acquires the semaphore (with block and timeout passed). If the semaphore cannot be acquired during this time, this is only possible if there are no items in the queue, which means we raise queue.Empty. Otherwise, we unconditionally acquire the lock and perform the operation in exclusive mode.

    I implemented something similar in my old queue implementation, so it should work as expected. There are some peculiarities with the order in which items are retrieved, but hardly anyone will notice.

    x42005e1f avatar Dec 16 '25 11:12 x42005e1f

    A quick workaround in line with the above:

    import multiprocessing
    import multiprocessing.queues
    
    from queue import Empty
    
    
    class _GH87302Queue(multiprocessing.queues.Queue):
        def __init__(self, maxsize=0, *, ctx):
            super().__init__(maxsize, ctx=ctx)
            self.__getsem = ctx.Semaphore(0)
    
        def __getstate__(self):
            return (*super().__getstate__(), self.__getsem)
    
        def __setstate__(self, state):
            super().__setstate__(state[:-1])
            self.__getsem = state[-1]
    
        def put(self, obj, block=True, timeout=None):
            super().put(obj, block, timeout)
            self.__getsem.release()
    
        def get(self, block=True, timeout=None):
            if self._closed:
                raise ValueError(f"Queue {self!r} is closed")
    
            if not self.__getsem.acquire(block, timeout):
                raise Empty
    
            return super().get()
    
        # optional (see the `full()` implementation)
        def empty(self):
            return self.__getsem._semlock._is_zero()
    
    
    def GH87302Queue(maxsize=0):
        return _GH87302Queue(maxsize, ctx=multiprocessing.get_context())
    

    And if you replace

    queue_class = multiprocessing.Queue
    

    with

    queue_class = GH87302Queue
    

    in the reproducer script, everything works as expected.


    Please note that multiprocessing.Queue, contrary to the documentation, is not actually a class!

    >>> import multiprocessing
    >>> multiprocessing.Queue
    <bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object at 0x7fe870fe36b0>>
    

    In the reproducer script, it would be more correct to call it a factory rather than a class.

    x42005e1f avatar Dec 16 '25 12:12 x42005e1f