Add `timeout` to multiprocessing.queues.JoinableQueue.join
Feature or enhancement
A number of join methods, such as on Process, expose a timeout option. This allows for "let child process compute, but in case of it crashing or taking too long, take other action". However, the JoinableQueue exposes only a param-less join which always blocks until all tasks are done, potentially forever.
This Enhancement suggests adding the timeout option, implemented via passing it down to the wait on an already existing _condition. The default value of the param is None and retains original behaviour. To detect whether the join returned because of "succeeding" or "failing", a new is_done method is added (but I think other means, such as raising in the join, also make sense).
Pitch
The example of usage is:
q = JoinableQueue()
for task in get_tasks():
q.put()
ps = [Process(target=worker_entrypoint, args=(q,) for _ in range(worker_count)]
for p in ps:
p.start()
q.join(timeout=10)
if q.is_done():
break # success
else:
# kill the child processes and fail, or retry, ...
for p in ps:
p.join()
While this simple example can be also handled by a Pool or ProcessPoolExecutor, there are usecases which are based on a Queue-semantics yet need a timeout-ed join. I currently have such a usecase, and solve it by subclassing JoinableQueue. I'll open a PR right away.
Previous discussion
This is somehow related to https://github.com/python/cpython/issues/96471 -- but less ambitious in scope, and I believe implementable independently.
Linked PRs
- gh-104449
Other queues (asynchronous, thread-safe) do not support timeout in the join() method, although it is easy to add such parameter. Do you think that they should be added too?
Instead of adding a method like is_done(), the join() method can be made returning a boolean result like Event.wait().
Other queues (asynchronous, thread-safe) do not support timeout in the join() method, although it is easy to add such parameter. Do you think that they should be added too?
yes -- I think that by default we should want a timeout in every join method everywhere. Anything that's multiprocessing and blocking should have a timeout to allow not ending up frozen...
Instead of adding a method like is_done(), the join() method can be made returning a boolean result like Event.wait().
my personal preference would be to treat join differently from is_done -- I think join represents "I want this to finish and I'm willing to wait for a while", so a side-effect intent, whereas is_done sounds to me like just a "instantly find out how what the total count of something is". But I'm completely ok with the join becoming boolean and then using the join(timeout=0) instead of the is_done -- with current implementations, thats the same thing as you say.
Not that I offer this particular interface. I just see a few options and think they should be considered in case you haven't considered them.
- Return None and add a special method
is_done()(ordone(), as inTask?) to check whether it was successful in time. - Return boolean.
- Raise an exception on timeout.
It is better to chose the uniform interface for different join() methods. Should the shutdown() methods also have the timeout parameter? I suggest to open a discussion on https://discuss.python.org/, this design covers a wide area.
This is a duplicate of #60187.