distributed
distributed copied to clipboard
Remove cancelled, resumed, and long-running states
- Mutually exclusive with #6699
- Mutually exclusive with #6716
- Closes #6682
- Closes #6689
- Closes #6693
- Closes #6685
- Closes #6708
- Closes #6709
- Closes #6877
- Out of scope: #6705
This is an even more aggressive redesign than #6699 and #6716.
Remove the long-running state. A seceded task is distinguished from an executing one exclusively by being in the WorkerState.long_running set instead of the executing set.
Remove the cancelled state. Instead, a cancelled task simply transitions to other states, while remaining in the executing, long_running, or in_flight_tasks sets. A task will not transition to forgotten for as long as it is in one of the three above sets.
Remove the resumed state. Instead,
- If a task is recommended to transition to
ready, but it is already in either theexecutingor thelong_runningsets, it transitions back toexecutinginstead - If a task is recommended to transition to
fetch, but it is already in thein_flight_tasksset, it transitions back toflightinstead
All end events for Execute and GatherDep:
- work as normal exclusively if the task is still in
executingorflightstate respectively - recommend a transition to
forgottenif the task is inreleasedstate - otherwise, they do nothing besides cleaning up and kicking off the next ensure_computing/ensure_communicating.
At any given moment, there may be both an Execute and a GatherDep instruction for the same task, running at the same time. If the Execute instruction finishes while the task is in flight, it will be a no-op, and vice versa. This means we no longer have to worry about mismatched end events.
Remove the previous, next, and done TaskState attributes.
TODO
- Ensure all tickets above meet DoD
- Failing tests in distributed/tests/test_cancelled_state.py
@fjetter as discussed. The implementation should be complete (save for bugs). An early review would be appreciated. Note that, unlike #6716, it does not address #6705 and should not require any scheduler-side changes.
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
15 files ± 0 15 suites ±0 6h 57m 7s :stopwatch: - 8m 48s 3 021 tests + 21 2 919 :heavy_check_mark: +11 82 :zzz: - 7 20 :x: + 17 22 351 runs +105 21 262 :heavy_check_mark: +70 967 :zzz: - 84 122 :x: +119
For more details on these failures, see this check.
Results for commit c5acdcd6. ± Comparison against base commit 1d0701b5.
:recycle: This comment has been updated with latest results.
The reduction of TaskStateStates causes the remaining states to gain a higher degree of degeneracy.
I think this is a wild exxageration. It causes three transitions, exactly, to gain a small code branch:
- transition_waiting_ready (resume executing)
- transition_generic_fetch (resume flight)
- transition_released_forgotten (do not forget if currently in GatherDep or Execute)
For instance, on main, TSS
flightclearly encodes a task that is currently in the process of being gathered from a remote peer. Further, the schedulers intention and the future flow of this task is clearly determined based on the result of the remote fetch. There is little to no ambiguity in any follow up decision, e.g. "gather dep failed -> Always do X".
This has not changed.
The control flow is unambiguous which leads to relatively easy code with few control branches
Just have a look at _transition_from_resumed. Alone it contains a wealth of subtle issues that are very hard to casually spot (#6693).
The control flow in main is, to say the least, chaotic. In main, the scheduler can think that a task is executing, but it's actually in flight. When the task completes, the scheduler can receive unexpected termination messages and has to deal with them. Unsurprisingly, it not always does so in the corner cases, as testified by the deadlock #6689.
Also in main, the worker state needs to deal with a GatherDep or Execute finishing while the state is just about anything. This has caused many issues in the past.
the
resumedstate which, strictly speaking, should break up intoresume_to_fetchandresume_to_waitingsubstates, depending on thets._nextattribute.
This is the theory, and it's well and good. Except that the implementation doesn't do that, and there are points where ts._next can be missing or other weird stuff. I can find them on request. The reason for this general bugginess is that it is just so ridiculously hard to wrap one's head around the cancelled/resumed state. I myself, after spending many weeks refactoring the state machine, did not have a solid grasp on it and only now I can state I fully understand it.
To my best understanding, this PR does not fundamentally change the control flow of a task but merely chooses a different way to describe it.
Not true. In main, when a resumed task completes (successfully or otherwise) you have several, edge use cases that you don't have when a regular task does.
More philosophically: we have a (somewhat) unescapable problem, which is Execute and GatherDep can't just be cancelled.
In main, the way to cope with it is to enter four special states, cancelled(flight), cancelled(executing), resumed(flight->waiting), resumed(executing->fetch), plus buggy intruders like resumed(executing->missing) just to deal with it, and a wealth of very, very special transitions for when each of these 4 states finishes.
In this PR, we simply say that the Execute and GatherDep asyncio tasks can just stay there, unattended, until we need to do one of three things:
- transition_waiting_ready (resume executing). Note that this is only a matter of courtesy towards the user code, as in some cases it may break when two identical tasks run at the same time in the same worker.
- transition_generic_fetch (resume flight). I strongly suspect we could get rid of this.
- transition_released_forgotten (do not forget if currently in GatherDep or Execute). This is only needed for a few things - namely, preserve
TaskState.resource_restrictions, and we could work around it easily by simply carrying the information through the Execute instruction to the ExecuteSuccessEvent, just like total_nbytes is carried from GatherDep to GatherDepDoneEvent. This means that when GatherDepDone and ExecuteDoneEvent are triggered, you could not have the task at all and I don't see a problem with it.
After all, we still can't cancel a
gather_depor anexecute(the later could be cancelled but it would not allow us to fundamentally simplify the problem so I'll ignore any "abort thread" proposals for the sake of the argument)
Agreed, such proposals are interesting but complicated to implement and definitely out of scope.
This PR now proposes to remove the
cancelledstate which will ultimately require us to encode the information I described above in a different way. Specifically, this PR proposes to transition a task directly back toreleasedand remember the information that a task is still executing by putting a task back to theexecutingdict, i.e. this PR increases the degeneracy of both thereleasedstate and theexecutingdict. In different words, on main, the semantic meaning of theexecutingdict was simple and unique. It included "all tasks in stateexecuting".
This is false. In main, the executing set includes
- state=executing
- state=cancelled, previous=executing
- state=resumed, previous=executing, next=fetch
- plus buggy states such as state=resumed, previous=executing, next=missing
While the long_running set includes
- state=long-running
- state=cancelled, previous=long-running
- state=resumed, previous=long-running, next=fetch
- state=resumed, previous=long-running, next=missing
and the in_flight_tasks set includes
- state=flight
- state=cancelled, previous=flight
- state=resumed, previous=flight, next=waiting
Also interesting to note that, given the above, the information encoded in the prev and next attributes is redundant and could be fully extrapolated from the inclusion of the task in one of the three sets. This PR, among other things, removes such redundancy.
Every time we'll interact with a released task, we'll need to check whether it is in a neutral state, it is still executing, etc.
False. There are very, very few places where this happens, because they're the places which would directly interact with the currently running asyncio tasks:
- transition_waiting_ready
- transition_generic_fetch (we probably could do without)
- transition_released_forgotten (we could do without, as discussed above)
- _handle_compute_task (do not overwrite resource_restrictions; we could clean it up making it work like total_nbytes, as discussed above).
This PR exhibits this increase of complexity in various places. Nice examples of this are
_execute_done_commonand_gather_dep_done_commonwhich previously were basically no-ops. The former doesn't even exist on main. Now, we got switch statements. This is how the finally clause ofgather_depstarted and evolved and we invested an awful amount of time to get rid of that.
You're talking about a very different time, where the event handlers where spaghettified together with the actual code running the instructions. I personally do not see any issue, today, in moving business logic away from the _transition_* methods and to the _handle_event ones.
I would even go as far as claim that there are bugs because we're not dealing with this degeneracy properly. For instance, let's assume a task was in flight, got cancelled and then asked to be computed again. IIUC, we're nowhere dealing with the fact that the task is still in flight but are transitioning the task straight to executing
Yes, this is the main feature of this PR.
i.e. we could have a task simultaneously in flight and in executing. That's an entire class of inconsistency problems that originally was causing the first "wave" of deadlocks.
Could you come up with what these inconsistency problems are? Again, you are talking about a time where gather_dep was changing the state machine itself. It can' anymore - only a selection of exit events can and, with this PR, they do so exclusively in two states, flight and released (and we could remove the latter use case, as explained above).
Could you come up with a list of examples of how a PR from a junior contributor could subtly cause this to become a problem, without any of the current tests tripping very explicitly about it? I can't come up with any.
At first glance, this PR reverts a lot of hard effort in making the state machine more explicit.
I disagree. It simply states that an abandoned GatherDep or Execute instruction should be a no-op for as much as possible.
The verbosity of the current code was intentional to a certain degree.
This PR has no intention to reduce verbosity. However, I already spent many many weeks dealing with issues that were specifically hidden in the transitions from cancelled and from resumed, particularly the intersections with other edge cases, e.g. #6685.
I don't see a need to change anything about
long-running.
I personally lost count of how many PRs I already wrote trying to fix the long-running state, all of which were caused by a change at some point in transition_executing_* while forgetting to replicate and test the same in _transition_long_running_*, or by adding a test for state == executing instead of state in (executing, long-running).
Long running tasks are not handled well right now
Yes, and the reason is the one above.
this change might address some of these artifacts
This change fixes all the problems of having a double state executing/long-running which must behave in the same way. With this PR, the one and only place where long-running is treated differently from executing is a single line in _ensure_computing, where it counts the tasks already running.
but they are not working well regardless since thread rejoining is not implemented.
This is a completely separate issue and it should be treated as out of scope. Fixing it will be no more no less difficult in main as it is in this PR.
...ok, I found an issue that may scupper the whole design.
In the first part of Worker.execute, there are many accesses to ts which require the task to not have changed - crucially, ts.dependencies.
In this events stream:
- ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
- FreeKeysEvent(keys=[x])
the task may not find z by the time it tries to read it from data.
This will never happen in the current PR, as the coroutine will exit with AlreadyCancelledEvent.
However, this is a big problem: In this events stream:
- ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
- FreeKeysEvent(keys=[x])
- ComputeTaskEvent(key=x, who_has={y: [...]})
then you are expected to resume the task - but z may not be there anymore, and run_spec most likely changed too.
I already encountered the same problem with resource_restrictions, and I could use the same logic for run_spec. dependencies is a lot more problematic.
I'm unsure if there's a clean way to deal with this which does not make Worker.execute less dumb than it is now (which is a huge feature) and is robust against subtle race conditions which are very hard to reproduce - a bunch of sleep(0) in the unit tests are needed to reproduce them.
I'll give it some further brainstorming and, if I fail, fall back to #6699.
I fixed the race condition I described in my previous post. The fix is... fancy, and testing it requires some finesse. At first sight, this detracts from the key point of this PR, which to make the code simpler and intrinsically more robust.
However, while I was dealing with this I encountered, in main, three other very subtle issues in the very same Worker.execute preamble, which require just as much finesse in writing proper test cases for:
- https://github.com/dask/distributed/issues/6867
- https://github.com/dask/distributed/issues/6869
- https://github.com/dask/dask/issues/9330 (this trips
fail_hard).
As discussed during standup, I will now pause this PR, work on the issues above, write the necessary complicated tests for them, and come back here.
https://github.com/dask/distributed/issues/6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.
This can cause overlap and our event handlers need to be rock solid to make sure arbitrary overlaps are handled properly which I am currently not sufficiently confident about.
Again, I'm advocating for dropping this PR in favor of https://github.com/dask/distributed/pull/6699 for the time being
We've got another issue automatically fixed by this PR: #6877.
#6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.
I can't understand how this PR would make #6869 any worse.
I've removed the done attribute :metal:
I've found a roadblock that I didn't consider before. When you transition from executing to released, all dependencies are immediately released as well (unless there are other dependants). However, they will remain in memory because they are referenced by the asyncio task running the user code. If, in the meantime, another task (or, somewhat less likely, the same task) lands with the same dependencies, then the dependencies will be fetched again over the network, leading to data duplication.
We could mitigate this through the weakref cache in SpillBuffer.__set__. Preventing the data transfer completely would be a lot more complicated and necessarily involve the worker state.
Even more complicated, the client may submit a new task with the same key but different dependencies while the previous execution is still running. I'm unsure how to behave there. (in main, te new run_spec and dependencies are completely disregarded).
I give up. I'll revert to #6699. I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it. @fjetter you were against that too - can we have a high-bandwidth chat about it?
I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it. @fjetter you were against that too - can we have a high-bandwidth chat about it?
Sure, we can have a conversation about this. Here the gist of it up front (no need to reply if we find some time to briefly talk about it)
TLDR I don't mind reusing the implementation and am open to making the transition methods to deal with both long-running and executing at the same time to avoid code duplication. We're already dealing with a similar situation when dealing with the ready/constrained states and I believe we can handle it similarly.
The primary reason why I prefer keeping it is instrumentation since I think the difference between executing and long-running is significantly different to warrant a distinction on monitoring dashboards (bokeh and prometheus).
There is a secondary technical concern outlined in the description of https://github.com/dask/distributed/pull/6607 where I argue that we should remove the rejoin/secede functionality and in-fact drop our custom threadpool entirely. If we were to go down that route, the distinction between long-running and executing would be helpful (not necessary).