Is `gevent.queue.Queue` threadsafe?
I've searched online, in these issues, and read the code, but I can't definitively answer this question for myself. Is gevent.queue.Queue threadsafe generally? And more specifically, is it safe to put items into an UnboundQueue from a non-greenlet thread and then get them from a greenlet thread?
gevent.event.Event has an explicit warning not to use it across threads, but I see no such warning for Queue. However, after some digging, it looks like Queue uses run_callback instead of run_callback_threadsafe, so I think that means it would not be safe to put from another thread. Is that correct?
https://github.com/gevent/gevent/blob/a67891991c95b5ffca78dc1f1a11f956ec3963ca/src/gevent/queue.py#L404-L406
If so, would making it safe be as simple as this?
from gevent.queue import UnboundQueue
class ThreadsafeUnboundQueue(UnboundQueue):
def _schedule_unlock(self):
if not self._event_unlock:
self._event_unlock = self.hub.loop.run_callback_threadsafe(self._unlock)
In case this is an XY problem, I'll explain why I think I need this solution.
I'm running a desktop application that uses Python as an embedded scripting language. The application runs in a main thread, which I think runs Qt for the GUI and other C code. The main thread handles incoming data and updates the GUI. Most of the time, the main thread does not execute user code, but it does run user-supplied Python functions when new data arrives. The documentation notes that these user-supplied functions should be short and non-blocking, otherwise you risk disrupting the GUI. Each of these functions is associated with a particular, user-defined type. It looks something like this:
class A: ...
class B: ...
class C: ...
def callback_a(data: A): ...
def callback_b(data: B): ...
def callback_c(data: C): ...
register_callback(A, callback_a)
register_callback(B, callback_b)
register_callback(C, callback_b)
The application also lets you run arbitrary Python code in a separate thread. To access the incoming data, I currently use a set of queue.Queues, so each callback looks like this.
queue_a = Queue()
def callback_a(data: A):
queue_a.put(data)
register_callback(A, callback_a)
queue_a.get(block=True)
The Queue is unbounded, so I never have to worry about blocking the GUI. And I can synchronize to incoming data from the separate thread by flushing and blocking on each queue.
However, now I want to select on multiple queues, i.e. block until new data is received on one of N different queues, but for ergonomic reasons, I don't want to have to change the callback to do so. Using traditional concurrency primitives, I could spin up N different threads to block on each queue, and then I could coalesce the data into a single queue. That would work, but it feels like asyncio or gevent would be a more natural fit.
Unfortunately, asyncio.queues.Queue is not threadsafe, so that option is not simple. I'm hoping that I can use a gevent queue instead.
I played with this myself, and I ran into LoopExit exceptions, I guess because the event loop doesn't know that another thread will eventually start further greenlets. I used a gevent.sleep greenlet to workaround that. Is the following example safe and correct?
from threading import Thread
import time
import gevent
from gevent.hub import Hub
from gevent.queue import UnboundQueue
def producer(hub: Hub, queue: UnboundQueue):
time.sleep(3)
hub.loop.run_callback_threadsafe(queue.put, 42)
hub = gevent.get_hub()
queue = UnboundQueue()
thread = Thread(target=producer, args=(hub, queue))
consumer = gevent.spawn(queue.get)
keep_alive = gevent.spawn(gevent.sleep, float("inf"))
thread.start()
consumer.join()
print(consumer.value)
thread.join()
keep_alive.kill()
Nothing in gevent.queue is considered to be multiple-thread safe. (In general, nothing in gevent is considered to be thread safe unless explicitly documented.) It would take a full code audit to be sure we're not relying on a series of actions being atomic because a greenlet couldn't have switched (but a thread could have).
I found a different solution to my problem, but thanks for answering.