luigi icon indicating copy to clipboard operation
luigi copied to clipboard

Fix blocking worker queue when scheduling in parallel

Open riga opened this issue 4 years ago • 8 comments

Hi,

this PR is meant to fix* and improve the parallel scheduling of tasks. To demonstrate what is currently failing (* therefore the fix), I added the WIP label and only added one commit with a simple test that currently, against my expectation, does not pass but blocks the entire process.

Description

The blocking occurs when parallel scheduling is enabled but a task is not picklable. The scheduling is mostly implemented in Worker.add,

https://github.com/spotify/luigi/blob/cf2abbdb998f9e0d72801b8c8e9b2f8cfc16b7a1/luigi/worker.py#L730-L774

where processes of a pool put the results of task completeness checks back into a queue. However, when the task is not picklable, the queue is never filled so that queue.get() will block forever without being able to access exceptions raised in the process. Instead, the exception is hold by the AsyncResult object returned by apply_async which is not used in Worker.add, so there is currently no handle to catch these cases.

Possible solution

IMHO, the root cause for this issue is the difference between the actual multiprocessing.Pool.apply_async (returning an async result object) and the custom SingleProcessPool.apply_async (instantly returning the result of the function call). The latter should produce something like a SyncResult with the same API as AsyncResult. Following this, one should rely on its built-in functionality to wrap exceptions (re-raised when calling get()) and a lot of the custom exception handling in worker.py would be obsolete (such as AsyncCompletionException and TracebackWrapper) which simplifies things quite a lot.

I already implemented this (all tests pass) and will push it if you agree :)

I know that you guys are (understandably) very cautious when it comes to changes in core code, but I think the parallel scheduling could really benefit from this change / fix.

Motivation and Context

Our dependency trees sometimes take O(h) to build as our completeness checks require a lot of remote resources. Therefore, we make heavy use of the parallel scheduling feature. Besides this issue/PR, we plan to submit two additional PRs in the future:

  • We noticed that a list of tasks started with interface.build(many_tasks_in_a_list, ...) is not using the parallel scheduling at all (because of this loop).
  • We would like to start running tasks already while the dependency tree is being built. Most of the required functionality is actually already there and would need only minor additions.

Have you tested this? If so, how?

Yes, the test is included and will pass once the parallel scheduling is fixed.

riga avatar Oct 28 '20 12:10 riga

This seems like a really interesting improvement, is there any chance of this making it into luigi? In addition, I would be also very interested in the two improvements mentioned above, could these be included in luigi without the PR suggested here?

sognetic avatar Jun 18 '21 07:06 sognetic

I am not a maintainer, but I think it would be great to get this in. We also use parallel scheduling to cut down latency until first task execution.

lallea avatar Jun 18 '21 09:06 lallea

I essentially have all changes I mentioned above ready in personal branches (and then work kicked in and couldn't follow up on this anymore).

@dlstadther If you agree with the ideas above, I would open PRs.

riga avatar Jun 23 '21 10:06 riga

I'm good with the submission of a fix to resolve this issue. Note that I don't utilize the Luigi project for work anymore, so i can only review as an outsider now. I'll do my best to be responsive, but times might be slow.

Also, Parallel scheduling is optional behaviour and is not enabled by default.

dlstadther avatar Jun 26 '21 17:06 dlstadther

I committed the fix of the loop doing the completeness checks during initial scheduling.

The main change is that queue objects are no longer required for storing the results of complete() calls across processes, but the current implementation rather fully relies on Async / SyncResult's for the multiprocess.Pool / SingleProcessPool cases. Actually, AsyncResult's have been used before as well, but somewhat mixed with said queues.

IMHO, this simplifies the code quite a lot and allows to add the 2 additional features I mentioned in the PR description (in future PRs), but I understand that changes to the core require a lot of scrutiny. @dlstadther Any idea who we can include for additional review?

The tests pass, but for some reason the coverage drops by ~8% and I fail to see where / how exactly ...

riga avatar Jul 05 '21 19:07 riga

@riga ; sorry, i've been a bit busy lately and haven't gotten to review this yet. I'll try to get to it in the next week.

As for others to review, that's a bit challenging. Unsure if @honnix or other Spotify folks can review.

dlstadther avatar Jul 17 '21 13:07 dlstadther

With the usual disclaimer that I am not a maintainer, I am familiar with the code and would like this PR to get in, so I'll contribute a review. It overall looks good. The polling itches a bit, since it might add up to measurable CPU usage for long tasks, but I don't have a suggestion for a way to avoid it that wouldn't add complexity. Luigi is built on a concept of polling, so I guess we should not be too picky. :-)

I'll add a few nitpicks inline.

lallea avatar Jul 18 '21 15:07 lallea

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

stale[bot] avatar Jan 09 '22 01:01 stale[bot]