persist-queue icon indicating copy to clipboard operation
persist-queue copied to clipboard

queue length for SQLiteQueue is incorrect when running in multiple processes

Open melbaa opened this issue 6 years ago • 6 comments

Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.

The queue size is set only once on queue creation. self.total = self._count(), so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.

To reproduce, we need producer and a consumer that's faster than the producer.

# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)


while True:
    try:
        q.qsize(), q.get(block=False); q.task_done()
    except persistqueue.exceptions.Empty:
        pass

Calling q._count() returns the correct size, because it hits the DB, of course.

melbaa avatar Nov 08 '18 16:11 melbaa

thanks for reporting, i haven't tested the queue in multi-process environment. if you find further issue. let me know, i am happy to add the support .

peter-wangxu avatar Nov 09 '18 13:11 peter-wangxu

Can confirm, this is indeed happened. However, this causes a somewhat annoying/strange issue. Somehow, using get() doesn't seem to instantly work or cause a delay when you call it on an empty queue.

If I push data into a FIFOSQLiteQueue with one process, the other one takes multiple seconds before actually fetching the values.

ThunderRush avatar Dec 07 '18 06:12 ThunderRush

@ThunderRush As far as I understand, using get on an empty queue is supposed to wait until it actually can get something. If you don't want this behaviour - you can either pass block=False to the get() or pass timeout parameter with the maximal value you are ready to wait.

bavaria95 avatar Dec 10 '18 10:12 bavaria95

The multi-process support was not added in this lib currently, I would be happy if anyone has interest in providing this ability. @melbaa @ThunderRush @bavaria95

peter-wangxu avatar Dec 11 '18 03:12 peter-wangxu

Recent updates have added max(0, count) to remove a negative qsize(). That doesn't change the overall issue, but prevents impossible size results. On the Ack Queues, a new active_size() was added which includes the nack cache. It may be anecdotal, but I believe this has produced a more accurate return in my multi-threaded environment as it's calculating when an item is put/ack/ack_failed, and not on put/get/nack. But that's more of a decision on when you think the queue size should be decremented, on get or on completion.

imidoriya avatar Apr 09 '21 19:04 imidoriya

Yes, this seems to be a bug. So instead of

    while not queue.empty():
        item = queue.get()
        pprint(item)
        queue.task_done()

use the workaround

    while True:
        try:
            item = queue.get(block=False)
        except persistqueue.exceptions.Empty:
            break
        pprint(item)
        queue.task_done()

decatur avatar May 28 '21 11:05 decatur