persist-queue
persist-queue copied to clipboard
queue length for SQLiteQueue is incorrect when running in multiple processes
Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.
The queue size is set only once on queue creation. self.total = self._count()
, so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.
To reproduce, we need producer and a consumer that's faster than the producer.
# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)
while True:
try:
q.qsize(), q.get(block=False); q.task_done()
except persistqueue.exceptions.Empty:
pass
Calling q._count() returns the correct size, because it hits the DB, of course.
thanks for reporting, i haven't tested the queue in multi-process environment. if you find further issue. let me know, i am happy to add the support .
Can confirm, this is indeed happened. However, this causes a somewhat annoying/strange issue. Somehow, using get() doesn't seem to instantly work or cause a delay when you call it on an empty queue.
If I push data into a FIFOSQLiteQueue with one process, the other one takes multiple seconds before actually fetching the values.
@ThunderRush As far as I understand, using get on an empty queue is supposed to wait until it actually can get something. If you don't want this behaviour - you can either pass block=False
to the get()
or pass timeout
parameter with the maximal value you are ready to wait.
The multi-process support was not added in this lib currently, I would be happy if anyone has interest in providing this ability. @melbaa @ThunderRush @bavaria95
Recent updates have added max(0, count)
to remove a negative qsize()
. That doesn't change the overall issue, but prevents impossible size results. On the Ack Queues, a new active_size()
was added which includes the nack cache. It may be anecdotal, but I believe this has produced a more accurate return in my multi-threaded environment as it's calculating when an item is put/ack/ack_failed, and not on put/get/nack. But that's more of a decision on when you think the queue size should be decremented, on get or on completion.
Yes, this seems to be a bug. So instead of
while not queue.empty():
item = queue.get()
pprint(item)
queue.task_done()
use the workaround
while True:
try:
item = queue.get(block=False)
except persistqueue.exceptions.Empty:
break
pprint(item)
queue.task_done()