distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Remove cancelled, resumed, and long-running states

Open crusaderky opened this issue 3 years ago • 9 comments

  • Mutually exclusive with #6699
  • Mutually exclusive with #6716
  • Closes #6682
  • Closes #6689
  • Closes #6693
  • Closes #6685
  • Closes #6708
  • Closes #6709
  • Closes #6877
  • Out of scope: #6705

This is an even more aggressive redesign than #6699 and #6716.

Remove the long-running state. A seceded task is distinguished from an executing one exclusively by being in the WorkerState.long_running set instead of the executing set.

Remove the cancelled state. Instead, a cancelled task simply transitions to other states, while remaining in the executing, long_running, or in_flight_tasks sets. A task will not transition to forgotten for as long as it is in one of the three above sets.

Remove the resumed state. Instead,

  • If a task is recommended to transition to ready, but it is already in either the executing or the long_running sets, it transitions back to executing instead
  • If a task is recommended to transition to fetch, but it is already in the in_flight_tasks set, it transitions back to flight instead

All end events for Execute and GatherDep:

  • work as normal exclusively if the task is still in executing or flight state respectively
  • recommend a transition to forgotten if the task is in released state
  • otherwise, they do nothing besides cleaning up and kicking off the next ensure_computing/ensure_communicating.

At any given moment, there may be both an Execute and a GatherDep instruction for the same task, running at the same time. If the Execute instruction finishes while the task is in flight, it will be a no-op, and vice versa. This means we no longer have to worry about mismatched end events.

Remove the previous, next, and done TaskState attributes.

TODO

  • Ensure all tickets above meet DoD
  • Failing tests in distributed/tests/test_cancelled_state.py

crusaderky avatar Aug 06 '22 17:08 crusaderky

@fjetter as discussed. The implementation should be complete (save for bugs). An early review would be appreciated. Note that, unlike #6716, it does not address #6705 and should not require any scheduler-side changes.

crusaderky avatar Aug 06 '22 17:08 crusaderky

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 57m 7s :stopwatch: - 8m 48s   3 021 tests +  21    2 919 :heavy_check_mark: +11    82 :zzz:  -   7    20 :x: +  17  22 351 runs  +105  21 262 :heavy_check_mark: +70  967 :zzz:  - 84  122 :x: +119 

For more details on these failures, see this check.

Results for commit c5acdcd6. ± Comparison against base commit 1d0701b5.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Aug 06 '22 18:08 github-actions[bot]

The reduction of TaskStateStates causes the remaining states to gain a higher degree of degeneracy.

I think this is a wild exxageration. It causes three transitions, exactly, to gain a small code branch:

  • transition_waiting_ready (resume executing)
  • transition_generic_fetch (resume flight)
  • transition_released_forgotten (do not forget if currently in GatherDep or Execute)

For instance, on main, TSS flight clearly encodes a task that is currently in the process of being gathered from a remote peer. Further, the schedulers intention and the future flow of this task is clearly determined based on the result of the remote fetch. There is little to no ambiguity in any follow up decision, e.g. "gather dep failed -> Always do X".

This has not changed.

The control flow is unambiguous which leads to relatively easy code with few control branches

Just have a look at _transition_from_resumed. Alone it contains a wealth of subtle issues that are very hard to casually spot (#6693).

The control flow in main is, to say the least, chaotic. In main, the scheduler can think that a task is executing, but it's actually in flight. When the task completes, the scheduler can receive unexpected termination messages and has to deal with them. Unsurprisingly, it not always does so in the corner cases, as testified by the deadlock #6689.

Also in main, the worker state needs to deal with a GatherDep or Execute finishing while the state is just about anything. This has caused many issues in the past.

the resumed state which, strictly speaking, should break up into resume_to_fetch and resume_to_waiting substates, depending on the ts._next attribute.

This is the theory, and it's well and good. Except that the implementation doesn't do that, and there are points where ts._next can be missing or other weird stuff. I can find them on request. The reason for this general bugginess is that it is just so ridiculously hard to wrap one's head around the cancelled/resumed state. I myself, after spending many weeks refactoring the state machine, did not have a solid grasp on it and only now I can state I fully understand it.

To my best understanding, this PR does not fundamentally change the control flow of a task but merely chooses a different way to describe it.

Not true. In main, when a resumed task completes (successfully or otherwise) you have several, edge use cases that you don't have when a regular task does.

More philosophically: we have a (somewhat) unescapable problem, which is Execute and GatherDep can't just be cancelled.

In main, the way to cope with it is to enter four special states, cancelled(flight), cancelled(executing), resumed(flight->waiting), resumed(executing->fetch), plus buggy intruders like resumed(executing->missing) just to deal with it, and a wealth of very, very special transitions for when each of these 4 states finishes.

In this PR, we simply say that the Execute and GatherDep asyncio tasks can just stay there, unattended, until we need to do one of three things:

  • transition_waiting_ready (resume executing). Note that this is only a matter of courtesy towards the user code, as in some cases it may break when two identical tasks run at the same time in the same worker.
  • transition_generic_fetch (resume flight). I strongly suspect we could get rid of this.
  • transition_released_forgotten (do not forget if currently in GatherDep or Execute). This is only needed for a few things - namely, preserve TaskState.resource_restrictions, and we could work around it easily by simply carrying the information through the Execute instruction to the ExecuteSuccessEvent, just like total_nbytes is carried from GatherDep to GatherDepDoneEvent. This means that when GatherDepDone and ExecuteDoneEvent are triggered, you could not have the task at all and I don't see a problem with it.

After all, we still can't cancel a gather_dep or an execute (the later could be cancelled but it would not allow us to fundamentally simplify the problem so I'll ignore any "abort thread" proposals for the sake of the argument)

Agreed, such proposals are interesting but complicated to implement and definitely out of scope.

This PR now proposes to remove the cancelled state which will ultimately require us to encode the information I described above in a different way. Specifically, this PR proposes to transition a task directly back to released and remember the information that a task is still executing by putting a task back to the executing dict, i.e. this PR increases the degeneracy of both the released state and the executing dict. In different words, on main, the semantic meaning of the executing dict was simple and unique. It included "all tasks in state executing".

This is false. In main, the executing set includes

  • state=executing
  • state=cancelled, previous=executing
  • state=resumed, previous=executing, next=fetch
  • plus buggy states such as state=resumed, previous=executing, next=missing

While the long_running set includes

  • state=long-running
  • state=cancelled, previous=long-running
  • state=resumed, previous=long-running, next=fetch
  • state=resumed, previous=long-running, next=missing

and the in_flight_tasks set includes

  • state=flight
  • state=cancelled, previous=flight
  • state=resumed, previous=flight, next=waiting

Also interesting to note that, given the above, the information encoded in the prev and next attributes is redundant and could be fully extrapolated from the inclusion of the task in one of the three sets. This PR, among other things, removes such redundancy.

Every time we'll interact with a released task, we'll need to check whether it is in a neutral state, it is still executing, etc.

False. There are very, very few places where this happens, because they're the places which would directly interact with the currently running asyncio tasks:

  • transition_waiting_ready
  • transition_generic_fetch (we probably could do without)
  • transition_released_forgotten (we could do without, as discussed above)
  • _handle_compute_task (do not overwrite resource_restrictions; we could clean it up making it work like total_nbytes, as discussed above).

This PR exhibits this increase of complexity in various places. Nice examples of this are _execute_done_common and _gather_dep_done_common which previously were basically no-ops. The former doesn't even exist on main. Now, we got switch statements. This is how the finally clause of gather_dep started and evolved and we invested an awful amount of time to get rid of that.

You're talking about a very different time, where the event handlers where spaghettified together with the actual code running the instructions. I personally do not see any issue, today, in moving business logic away from the _transition_* methods and to the _handle_event ones.

I would even go as far as claim that there are bugs because we're not dealing with this degeneracy properly. For instance, let's assume a task was in flight, got cancelled and then asked to be computed again. IIUC, we're nowhere dealing with the fact that the task is still in flight but are transitioning the task straight to executing

Yes, this is the main feature of this PR.

i.e. we could have a task simultaneously in flight and in executing. That's an entire class of inconsistency problems that originally was causing the first "wave" of deadlocks.

Could you come up with what these inconsistency problems are? Again, you are talking about a time where gather_dep was changing the state machine itself. It can' anymore - only a selection of exit events can and, with this PR, they do so exclusively in two states, flight and released (and we could remove the latter use case, as explained above).

Could you come up with a list of examples of how a PR from a junior contributor could subtly cause this to become a problem, without any of the current tests tripping very explicitly about it? I can't come up with any.

At first glance, this PR reverts a lot of hard effort in making the state machine more explicit.

I disagree. It simply states that an abandoned GatherDep or Execute instruction should be a no-op for as much as possible.

The verbosity of the current code was intentional to a certain degree.

This PR has no intention to reduce verbosity. However, I already spent many many weeks dealing with issues that were specifically hidden in the transitions from cancelled and from resumed, particularly the intersections with other edge cases, e.g. #6685.

I don't see a need to change anything about long-running.

I personally lost count of how many PRs I already wrote trying to fix the long-running state, all of which were caused by a change at some point in transition_executing_* while forgetting to replicate and test the same in _transition_long_running_*, or by adding a test for state == executing instead of state in (executing, long-running).

Long running tasks are not handled well right now

Yes, and the reason is the one above.

this change might address some of these artifacts

This change fixes all the problems of having a double state executing/long-running which must behave in the same way. With this PR, the one and only place where long-running is treated differently from executing is a single line in _ensure_computing, where it counts the tasks already running.

but they are not working well regardless since thread rejoining is not implemented.

This is a completely separate issue and it should be treated as out of scope. Fixing it will be no more no less difficult in main as it is in this PR.

crusaderky avatar Aug 08 '22 13:08 crusaderky

...ok, I found an issue that may scupper the whole design. In the first part of Worker.execute, there are many accesses to ts which require the task to not have changed - crucially, ts.dependencies.

In this events stream:

  • ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
  • FreeKeysEvent(keys=[x])

the task may not find z by the time it tries to read it from data. This will never happen in the current PR, as the coroutine will exit with AlreadyCancelledEvent.

However, this is a big problem: In this events stream:

  • ComputeTaskEvent(key=x, who_has={y: [...], z: [...]})
  • FreeKeysEvent(keys=[x])
  • ComputeTaskEvent(key=x, who_has={y: [...]})

then you are expected to resume the task - but z may not be there anymore, and run_spec most likely changed too.

I already encountered the same problem with resource_restrictions, and I could use the same logic for run_spec. dependencies is a lot more problematic.

I'm unsure if there's a clean way to deal with this which does not make Worker.execute less dumb than it is now (which is a huge feature) and is robust against subtle race conditions which are very hard to reproduce - a bunch of sleep(0) in the unit tests are needed to reproduce them. I'll give it some further brainstorming and, if I fail, fall back to #6699.

crusaderky avatar Aug 08 '22 16:08 crusaderky

I fixed the race condition I described in my previous post. The fix is... fancy, and testing it requires some finesse. At first sight, this detracts from the key point of this PR, which to make the code simpler and intrinsically more robust.

However, while I was dealing with this I encountered, in main, three other very subtle issues in the very same Worker.execute preamble, which require just as much finesse in writing proper test cases for:

  • https://github.com/dask/distributed/issues/6867
  • https://github.com/dask/distributed/issues/6869
  • https://github.com/dask/dask/issues/9330 (this trips fail_hard).

As discussed during standup, I will now pause this PR, work on the issues above, write the necessary complicated tests for them, and come back here.

crusaderky avatar Aug 10 '22 15:08 crusaderky

https://github.com/dask/distributed/issues/6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.

This can cause overlap and our event handlers need to be rock solid to make sure arbitrary overlaps are handled properly which I am currently not sufficiently confident about.

Again, I'm advocating for dropping this PR in favor of https://github.com/dask/distributed/pull/6699 for the time being

fjetter avatar Aug 11 '22 10:08 fjetter

We've got another issue automatically fixed by this PR: #6877.

crusaderky avatar Aug 11 '22 18:08 crusaderky

#6869 is an example why I am concerned about letting multiple coroutines for the same task running, i.e. gather_dep and execute.

I can't understand how this PR would make #6869 any worse.

crusaderky avatar Aug 11 '22 20:08 crusaderky

I've removed the done attribute :metal:

crusaderky avatar Aug 12 '22 16:08 crusaderky

I've found a roadblock that I didn't consider before. When you transition from executing to released, all dependencies are immediately released as well (unless there are other dependants). However, they will remain in memory because they are referenced by the asyncio task running the user code. If, in the meantime, another task (or, somewhat less likely, the same task) lands with the same dependencies, then the dependencies will be fetched again over the network, leading to data duplication.

We could mitigate this through the weakref cache in SpillBuffer.__set__. Preventing the data transfer completely would be a lot more complicated and necessarily involve the worker state. Even more complicated, the client may submit a new task with the same key but different dependencies while the previous execution is still running. I'm unsure how to behave there. (in main, te new run_spec and dependencies are completely disregarded).

I give up. I'll revert to #6699. I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it. @fjetter you were against that too - can we have a high-bandwidth chat about it?

crusaderky avatar Aug 15 '22 11:08 crusaderky

I still want to go on with the removal of the 'long-running' state though and I'd like to open a PR just for it. @fjetter you were against that too - can we have a high-bandwidth chat about it?

Sure, we can have a conversation about this. Here the gist of it up front (no need to reply if we find some time to briefly talk about it)

TLDR I don't mind reusing the implementation and am open to making the transition methods to deal with both long-running and executing at the same time to avoid code duplication. We're already dealing with a similar situation when dealing with the ready/constrained states and I believe we can handle it similarly. The primary reason why I prefer keeping it is instrumentation since I think the difference between executing and long-running is significantly different to warrant a distinction on monitoring dashboards (bokeh and prometheus). There is a secondary technical concern outlined in the description of https://github.com/dask/distributed/pull/6607 where I argue that we should remove the rejoin/secede functionality and in-fact drop our custom threadpool entirely. If we were to go down that route, the distinction between long-running and executing would be helpful (not necessary).

fjetter avatar Aug 15 '22 13:08 fjetter