luigi
luigi copied to clipboard
Fix blocking worker queue when scheduling in parallel
Hi,
this PR is meant to fix* and improve the parallel scheduling of tasks. To demonstrate what is currently failing (* therefore the fix), I added the WIP label and only added one commit with a simple test that currently, against my expectation, does not pass but blocks the entire process.
Description
The blocking occurs when parallel scheduling is enabled but a task is not picklable. The scheduling is mostly implemented in Worker.add
,
https://github.com/spotify/luigi/blob/cf2abbdb998f9e0d72801b8c8e9b2f8cfc16b7a1/luigi/worker.py#L730-L774
where processes of a pool put the results of task completeness checks back into a queue. However, when the task is not picklable, the queue is never filled so that queue.get()
will block forever without being able to access exceptions raised in the process. Instead, the exception is hold by the AsyncResult
object returned by apply_async
which is not used in Worker.add
, so there is currently no handle to catch these cases.
Possible solution
IMHO, the root cause for this issue is the difference between the actual multiprocessing.Pool.apply_async
(returning an async result object) and the custom SingleProcessPool.apply_async
(instantly returning the result of the function call). The latter should produce something like a SyncResult
with the same API as AsyncResult
. Following this, one should rely on its built-in functionality to wrap exceptions (re-raised when calling get()
) and a lot of the custom exception handling in worker.py would be obsolete (such as AsyncCompletionException
and TracebackWrapper
) which simplifies things quite a lot.
I already implemented this (all tests pass) and will push it if you agree :)
I know that you guys are (understandably) very cautious when it comes to changes in core code, but I think the parallel scheduling could really benefit from this change / fix.
Motivation and Context
Our dependency trees sometimes take O(h) to build as our completeness checks require a lot of remote resources. Therefore, we make heavy use of the parallel scheduling feature. Besides this issue/PR, we plan to submit two additional PRs in the future:
- We noticed that a list of tasks started with
interface.build(many_tasks_in_a_list, ...)
is not using the parallel scheduling at all (because of this loop). - We would like to start running tasks already while the dependency tree is being built. Most of the required functionality is actually already there and would need only minor additions.
Have you tested this? If so, how?
Yes, the test is included and will pass once the parallel scheduling is fixed.
This seems like a really interesting improvement, is there any chance of this making it into luigi? In addition, I would be also very interested in the two improvements mentioned above, could these be included in luigi without the PR suggested here?
I am not a maintainer, but I think it would be great to get this in. We also use parallel scheduling to cut down latency until first task execution.
I essentially have all changes I mentioned above ready in personal branches (and then work kicked in and couldn't follow up on this anymore).
@dlstadther If you agree with the ideas above, I would open PRs.
I'm good with the submission of a fix to resolve this issue. Note that I don't utilize the Luigi project for work anymore, so i can only review as an outsider now. I'll do my best to be responsive, but times might be slow.
Also, Parallel scheduling is optional behaviour and is not enabled by default.
I committed the fix of the loop doing the completeness checks during initial scheduling.
The main change is that queue objects are no longer required for storing the results of complete()
calls across processes, but the current implementation rather fully relies on Async
/ SyncResult
's for the multiprocess.Pool
/ SingleProcessPool
cases. Actually, AsyncResult
's have been used before as well, but somewhat mixed with said queues.
IMHO, this simplifies the code quite a lot and allows to add the 2 additional features I mentioned in the PR description (in future PRs), but I understand that changes to the core require a lot of scrutiny. @dlstadther Any idea who we can include for additional review?
The tests pass, but for some reason the coverage drops by ~8% and I fail to see where / how exactly ...
@riga ; sorry, i've been a bit busy lately and haven't gotten to review this yet. I'll try to get to it in the next week.
As for others to review, that's a bit challenging. Unsure if @honnix or other Spotify folks can review.
With the usual disclaimer that I am not a maintainer, I am familiar with the code and would like this PR to get in, so I'll contribute a review. It overall looks good. The polling itches a bit, since it might add up to measurable CPU usage for long tasks, but I don't have a suggestion for a way to avoid it that wouldn't add complexity. Luigi is built on a concept of polling, so I guess we should not be too picky. :-)
I'll add a few nitpicks inline.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.