distributed
                                
                                 distributed copied to clipboard
                                
                                    distributed copied to clipboard
                            
                            
                            
                        Withhold root tasks [no co assignment]
This PR withholds root tasks on the scheduler in a global priority queue. Non-root tasks are unaffected.
Workers are only sent as many root tasks as they have threads, by default. This factor can be configured via distributed.scheduler.worker-saturation (1.5 would send workers 1.5x as many tasks than they have threads, for example). Setting this config value to inf completely disables scheduler-side queuing and retains the current scheduling behavior ~(minus co-assignment)~.
This disregards root task co-assignment. Benchmarking will determine whether fixing root task overproduction is enough of a gain to be worth giving up (flawed) co-assignment. Root task assignment here is typically worst-possible-case: neighboring tasks will usually all be assigned to different workers.
~I also could/will easily add back co-assignment when distributed.scheduler.worker-saturation is inf~ EDIT: done. With that, this PR would be entirely feature-flaggable—we could merge it with the default set to inf and see zero change in scheduling out of the box.
Closes #6560
Supersedes https://github.com/dask/distributed/pull/6584, which did the same, but for all tasks (even non-root). It also co-mingled unrunnable tasks (due to restrictions) and queued root tasks, which seemed unwise.
- [ ] Tests added / passed
- [x] Passes pre-commit run --all-files
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
15 files ± 0 15 suites ±0 7h 1m 15s :stopwatch: + 18m 18s 3 071 tests + 18 2 984 :heavy_check_mark: + 15 85 :zzz: +1 2 :x: +2 22 729 runs +144 21 740 :heavy_check_mark: +137 986 :zzz: +4 3 :x: +3
For more details on these failures, see this check.
Results for commit 093d7dcb. ± Comparison against base commit 817ead3a.
:recycle: This comment has been updated with latest results.
This community issue is likely impacted by this PR: https://dask.discourse.group/t/slow-down-transfer-between-workers/1012/2
@fjetter I've substantially refactored decide_worker and a lot of the transitions around it, hopefully addressing https://github.com/dask/distributed/pull/6614#pullrequestreview-1081757348.
The downside is that this makes the diff larger and even harder to read now. It might? be easier to just look at that commit and its message, though I'm not sure: d47e80dc359b6edcd17eb2686570b6582075f7f2.
The major changes are:
- decide_workeris split into- decide_worker_rootish_queuing_disabled,- decide_worker_rootish_queuing_enabled, and- decide_worker_non_rootish, which are the three cases that used to be all in that one function.
- Thanks to that, decide_workerno longer makes recommendations. It just returns a worker, or None. Depending on which of those methods is used, the caller is responsible for recommending the key toqueuedorno-worker.
- All tasks that are "ready" (deps in memory) should only be transitioned to processing. That transition (primarily transition_waiting_processing) will then decide whichdecide_worker_*method to use on them, possibly sending them to a worker if one works, otherwise sending them toqueuedorno-workerappropriately.- 
I thought about adding a readystate to act as this dispatch. It might make things a little easier to understand, but the decision about whether or not a task could goready->processingcomes down to whetherdecide_worker_*returns None or not. The problem is that, if it doesn't return None, we don't have a way to pass that WorkerState object on as an argument to atransition_ready_processingfunction, and I definitely don't want to rundecide_workertwice.Spending more time with it though, I think I actually like using processingas the dispatch. I'm pretty happy with this structure.
 
- 
- I had to refactor the no-workerstate a little. Now, bothqueuedandno-workerare "ready" states: their tasks are not waiting on any dependencies, they're just waiting on workers. Therefore, they're both reachable only viatransition_waiting_processing. And once tasks are in those states, they don't have to go back throughwaiting—they can go straight toprocessingas soon as a worker becomes available.
The fact that I could entirely remove transition_no_worker_memory without any tests failing is... interesting?
Its current existence implies I should also write transition_queued_memory, but I'd rather not since it doesn't seem like either of those transitions should actually be possible. The only ways I can imagine this happening:
- A client.scatterwithin a task, using a key that's currently inno-worker
- An unexpected worker completed task situation. But since a no-workertask should not be running in the first place, and we no longer allow worker reconnection, I don't see how this should be possible. If a worker exists that could run it, it should be inprocessing, notno-worker. So how could we have an open connection to an eligible worker, but not have it running there? My guess would be https://github.com/dask/distributed/issues/6390
With https://github.com/dask/distributed/pull/6946 / https://github.com/dask/distributed/issues/6874 and offline discussion today, seems like maybe making stimulus_task_finished ignore task completions from workers besides ts.processing_on might be broadly valuable https://github.com/dask/distributed/pull/6884#issuecomment-1217824082?
Thanks @gjoseph92 ! I'll have a look asap. I suggest to keep the refactoring split off in the commit for now and we can decide if this is rather a follow up PR or if we want to keep it like this.
The fact that I could entirely remove transition_no_worker_memory without any tests failing is... interesting?
interesting, but not that surprising. The only way I see this happening is by scattering a key that also has resource or host constraints, e.g.
with Scheduler() as s:
    with Worker(s) as w:
        with Client(s) as c:
            c.submit(func, key='foo', ressources={"A": 1})
            sleep(1)
            c.scatter({"foo": "bar"})
I think that's a rather questionable application. To protect from this we'd need to forbid overriding already known keys by scatter. I suggest to leave it as is / file an issue and move on.
Not going to add transition_queued_memory; instead, I think we should remove transition_no_worker_memory in a separate PR https://github.com/dask/distributed/issues/6960.
Weird that test_graph_execution_width has failed once on macOS:
>       assert max(Refcount.log) == s.total_nthreads
E       AssertionError: assert 6 == 8
I'm tempted to switch the == to <=, but something also seems wrong if the scheduler is under-saturating workers.
It makes me wonder if the macOS machines are running so slowly, they're not able to saturate all 8 threads? (Probably they don't even have 8 cores?) If the worker is really, really slow to start the next root task, it's possible the data-consuming downstream task (which will start first, since it's higher priority) could complete before the root task is even able to allocate a new Refcount object. This test is making a timing assumption that tasks in both threads start and finish at approximately the same time. If one thread was effectively blocked on the other thread, you could imagine getting less than maximum parallelism.
Additionally, the logs have both Event loop was unresponsive in Scheduler for 7.33s (repeated for all Workers too) and full garbage collections took 28% CPU time recently (threshold: 10%) (10min after the scheduler closed?!).
So I think changing to <= is reasonable, but I also don't like the possibility that opens up to accidentally under-saturate workers and have tests still pass.
I'm leaving the review of the transition machinery and tests to @fjetter.
Besides rounding, my perplexity entirely revolves around the is_rootish method.
- Why do we need it to begin with? What would happen if you just short-circuited it to return True? I understand that you'd getworker-saturation * nthreadsprocessing tasks per worker at any given time. From the worker's perspective, it would cap the length of thereadyheap to(worker-saturation - 1) * nthreadstasks. Wouldn't it be desirable? What does the extra complexity of differentiating between rootish and non-rootish tasks give us?
- There is a comment # TODO short-circuit to True if not ts.dependencies?which seems pretty important to me. Why was it not done? It would mean that actual root tasks would always be throttled, even if the number of chunks is less than the number of threads on the cluster. How would it impact performance?
- The method introduces some hardcoded factors which seem pretty important to me (2, 5, 5). Where do they come from? Are they the outcome of performance testing with different numbers?
- The numerical thresholds will cause a task group to "click" into rootish state as soon as you reduce the chunk size, or increase the array/dataframe size, or reduce the cluster size, through a very specific threshold which is very hard to spot - I might say virtually impossible, if you aren't a power user which is aware of this setting - and is heavily dependent on the interaction between user application and infrastructure. Which circles back to point 1 - do we actually need thresholds to begin with?
[EDIT] short-circuiting the method to return True would also mean that the worker would be a lot less eager to gather dependencies for tasks that would end up nowhere near the top of the ready queue; I suspect this would have massive benefits memory wise in some workloads.
Why do we need it to begin with?
That was my suggestion and is a prerequisite to allow breaking up decide_worker into multiple pieces. See also https://github.com/dask/distributed/issues/6922 for a discussion about how this could be utilized in the future.
There is a comment # TODO short-circuit to True if not ts.dependencies? which seems pretty important to me.
see also https://github.com/dask/distributed/pull/6974
The method introduces some hardcoded factors which seem pretty important to me (2, 5, 5). Where do they come from? Are they the outcome of performance testing with different numbers?
These are not new. See https://github.com/dask/distributed/pull/4967 for some history
@crusaderky I share your concern about the classification logic of root tasks. That's been developed last year and I would like to make this more solid (https://github.com/dask/distributed/issues/6922). I would suggest to split off this conversation from the actual change proposed in this PR
I agree we should aim to get this in main quickly and then further iterate. I can see two avenues:
- get it in as-is (post minor tweaks), and then run performance benchmarks vs a branch where is_rootish simply returns true. This potentially means getting in main a lot of code to then remove it shortly afterwards.
- run perf benchmarks now, before merging, to prove that the is_rootishheuristic is indeed needed, albeit it may be tweaked in the future. To clarify I don't propose to benchmark wildly different tweaks tois_rootish; I would just like a battery of tests with different use cases showing
- main
- this PR with worker-saturation: inf(no regression vs main - just for safety)
- this PR with worker-saturation: 1.2
- this PR with worker-saturation: 1.2, but withreturn Trueat the top ofis_rootish
@fjetter I believe all comments have been addressed.
For tests, I went with just test_graph_execution_width. I removed the process memory test. I liked the simplicity of your test suggestion, but test_graph_execution_width is slightly more thorough towards one edge case.
I agree we should aim to get this in main quickly and then further iterate. I can see two avenues:
- get it in as-is (post minor tweaks), and then run performance benchmarks vs a branch where is_rootish simply returns true. This potentially means getting in main a lot of code to then remove it shortly afterwards.
- run perf benchmarks now, before merging, to prove that the
is_rootishheuristic is indeed needed, albeit it may be tweaked in the future. To clarify I don't propose to benchmark wildly different tweaks tois_rootish; I would just like a battery of tests with different use cases showing
- main
- this PR with
worker-saturation: inf(no regression vs main - just for safety)- this PR with
worker-saturation: 1.2- this PR with
worker-saturation: 1.2, but withreturn Trueat the top ofis_rootish
We discussed this early on, before we even started implementation. We agreed to merge this behind the feature flag since this will not change the behavior compared to main. The goal is to set a default parameter for this value asap by running benchmarks. If we are not happy with the performance or cannot find a value that is a sane default, we may even rip this entire thing out again.
@gjoseph92 (and everyone else involved here), thank you! How do I test it out ;)?
with dask.config.set({"distributed.scheduler.worker-saturation": 1.5}):
    result.compute()
Is this right? What range of values should I provide: inf as an upper-bound is not very useful.
There are many different ways to set dask config, and depending on how your clusters are deployed (local vs dask-gateway/pangeo vs dask-cloudprovider vs coiled, etc.), the way to set that config will vary.
Often though, the easiest can be to set an environment variable on the cluster. (Pangeo / dask-gateway docs, coiled docs, saturn docs, dask-cloudprovider seems to support env_vars=.) Note that the variable needs to be set before the scheduler starts—once the scheduler has started, setting it will have no effect.
$ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler
For a local cluster, when creating your cluster, you can just do:
with dask.config.set({"distributed.scheduler.worker-saturation": 1.0}):
    client = distributed.Client(n_workers=..., threads_per_worker=...)
If you can't get the config to work, it's possible to change the setting on a live cluster. You could also use this to try different settings without re-creating the cluster. Only run this while the scheduler is idle (no tasks). Otherwise, you'll probably break your cluster.
# enable queuing (new behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.0))
# disable queuing (old behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", float("inf")))
What range of values should I provide:
infas an upper-bound is not very useful
I would try the 1.0 - 2.0 range. I would expect 1.0 to usually be what you want. We are doing some benchmarking, and hopefully will figure out what a good value is across the board, and remove the need/ability to set this value in the future.
@dcherian and anyone who tries this, please report back with your findings, regardless of what they are! We would really like to hear how this works on real-world uses.
Just checking in here, @dcherian any luck trying things out? Happy to help out
Great to see this merged (and exciting to see Deepak's results too)!
Now that we have 2022.9.2 on the LEAP hub (https://github.com/2i2c-org/infrastructure/pull/1769), I'm trying this out there.
Unfortunately I can't seem to set the worker saturation option successfully. :confused:
Setting via the gateway cluster options manager isn't working - if I do this
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
gc = g.new_cluster(cluster_options=options)
the cluster starts as expected, but if I do this
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0", "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION": 1.1}}
gc = g.new_cluster(cluster_options=options)
then it hangs indefinitely on cluster creation.
I'm not quite sure where or when I'm supposed to run this
$ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler - it creates a new scheduler? Is that different to the cluster?
I also tried client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.1))
but when I did the check suggested on the pangeo cloud docs I just get an empty dict back.