multiprocessing.Queue.get(block=False) can raise queue.Empty on non-empty queue
| BPO | 43136 |
|---|---|
| Nosy | @sbraz, @JulienPalard |
| Files |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
assignee = None
closed_at = None
created_at = <Date 2021-02-05.12:03:06.227>
labels = ['type-bug', '3.8', '3.9', '3.10', '3.7', 'library']
title = 'multiprocessing.Queue.get(block=False) can raise queue.Empty on non-empty queue'
updated_at = <Date 2021-02-05.15:20:21.144>
user = 'https://github.com/sbraz'
bugs.python.org fields:
activity = <Date 2021-02-05.15:20:21.144>
actor = 'mdk'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['Library (Lib)']
creation = <Date 2021-02-05.12:03:06.227>
creator = 'sbraz'
dependencies = []
files = ['49791']
hgrepos = []
issue_num = 43136
keywords = []
message_count = 2.0
messages = ['386525', '386528']
nosy_count = 2.0
nosy_names = ['sbraz', 'mdk']
pr_nums = []
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = 'behavior'
url = 'https://bugs.python.org/issue43136'
versions = ['Python 3.6', 'Python 3.7', 'Python 3.8', 'Python 3.9', 'Python 3.10']
Hello, I have noticed that when multiple processes try to simultaneously get items from a multiprocessing queue with block=False, it will raise queue.Empty even though there are still items in it. Adding a multiprocessing lock around calls to Queue.get fixes the problem.
Please consider the attached reproducer script and its output: $ ./queue_test.py using processes 2021-02-05T12:48:21.742728 worker 0 got 0, queue size was 100 2021-02-05T12:48:21.743702 worker 1 got 1, queue size was 99 2021-02-05T12:48:21.744059 worker 2 got 2, queue size was 98 2021-02-05T12:48:21.745352 worker 3 got 3, queue size was 97 2021-02-05T12:48:22.743905 worker 1 queue is EMPTY, size was 96 2021-02-05T12:48:22.744064 worker 0 got 4, queue size was 96 2021-02-05T12:48:22.746525 worker 3 queue is EMPTY, size was 95 2021-02-05T12:48:22.749573 worker 2 got 5, queue size was 95 2021-02-05T12:48:23.744474 worker 0 got 6, queue size was 94 2021-02-05T12:48:23.750728 worker 2 got 7, queue size was 93 2021-02-05T12:48:24.745852 worker 0 got 8, queue size was 92 2021-02-05T12:48:24.751827 worker 2 got 9, queue size was 91 […]
I have been able to reproduce this problem with Python 2.7 and 3.5 through 3.9 (3.10 untested).
When using threads and queue.Queue instead of their multiprocessing counterparts, the problem is not present ("./queue_test.py thread" → no spurious exceptions until the queue is actually empty).
I can reproduce it in 3.10 too.
I have the same issue but with block=True and timeout=1
I can also reproduce this with block=False
same problem too. queue.qsize() return 50 but queue.get(False) raise queue.Empty. Not sure because queue.put() is async or other reasons?
Yup - same behaviour. qsize returns items in the queue, but get.no_wait raises queue.Empty.
Python 3.12
My workaround for this is to block, with a smaill delay in a loop, then it seems to work, where it only hits the except Empty a couple of times
while True:
try:
queue.get(timeout=0.05)
except Empty:
pass
size = queue.qsize()
log.info("Queue length: %s", size)
if not size:
break
The problem is too simple (and yes, it is also reproducible on any version of Python regardless of platform, and now I will explain why). If we add traceback.print_exc() to the except block, we will see that the cause in the implementation is the following block ("rlock" is read as "read lock"; the name is somewhat ambiguous):
if not self._rlock.acquire(block, timeout):
raise Empty
The meaning is as follows: if someone is already performing the get operation, and because of this we did not have time to acquire the lock to ensure exclusive access, we raise queue.Empty. In general, this is usually desirable behavior for non-blocking operations, but the current exception is misleading, and neither the queue module nor even the threading module do this (all underlying lock acquisitions in all their primitives are unconditionally blocking).
One solution would be to add a second semaphore that would synchronize get operations (similar to the one that synchronizes put operations). Its initial value would be zero. The put operation releases the semaphore, and the get operation acquires the semaphore (with block and timeout passed). If the semaphore cannot be acquired during this time, this is only possible if there are no items in the queue, which means we raise queue.Empty. Otherwise, we unconditionally acquire the lock and perform the operation in exclusive mode.
I implemented something similar in my old queue implementation, so it should work as expected. There are some peculiarities with the order in which items are retrieved, but hardly anyone will notice.
A quick workaround in line with the above:
import multiprocessing
import multiprocessing.queues
from queue import Empty
class _GH87302Queue(multiprocessing.queues.Queue):
def __init__(self, maxsize=0, *, ctx):
super().__init__(maxsize, ctx=ctx)
self.__getsem = ctx.Semaphore(0)
def __getstate__(self):
return (*super().__getstate__(), self.__getsem)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self.__getsem = state[-1]
def put(self, obj, block=True, timeout=None):
super().put(obj, block, timeout)
self.__getsem.release()
def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self.__getsem.acquire(block, timeout):
raise Empty
return super().get()
# optional (see the `full()` implementation)
def empty(self):
return self.__getsem._semlock._is_zero()
def GH87302Queue(maxsize=0):
return _GH87302Queue(maxsize, ctx=multiprocessing.get_context())
And if you replace
queue_class = multiprocessing.Queue
with
queue_class = GH87302Queue
in the reproducer script, everything works as expected.
Please note that multiprocessing.Queue, contrary to the documentation, is not actually a class!
>>> import multiprocessing
>>> multiprocessing.Queue
<bound method BaseContext.Queue of <multiprocessing.context.DefaultContext object at 0x7fe870fe36b0>>
In the reproducer script, it would be more correct to call it a factory rather than a class.