distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Withhold root tasks [no co assignment]

Open gjoseph92 opened this issue 3 years ago • 1 comments

This PR withholds root tasks on the scheduler in a global priority queue. Non-root tasks are unaffected.

Workers are only sent as many root tasks as they have threads, by default. This factor can be configured via distributed.scheduler.worker-saturation (1.5 would send workers 1.5x as many tasks than they have threads, for example). Setting this config value to inf completely disables scheduler-side queuing and retains the current scheduling behavior ~(minus co-assignment)~.

This disregards root task co-assignment. Benchmarking will determine whether fixing root task overproduction is enough of a gain to be worth giving up (flawed) co-assignment. Root task assignment here is typically worst-possible-case: neighboring tasks will usually all be assigned to different workers.

~I also could/will easily add back co-assignment when distributed.scheduler.worker-saturation is inf~ EDIT: done. With that, this PR would be entirely feature-flaggable—we could merge it with the default set to inf and see zero change in scheduling out of the box.

Closes #6560

Supersedes https://github.com/dask/distributed/pull/6584, which did the same, but for all tasks (even non-root). It also co-mingled unrunnable tasks (due to restrictions) and queued root tasks, which seemed unwise.

  • [ ] Tests added / passed
  • [x] Passes pre-commit run --all-files

gjoseph92 avatar Jun 22 '22 23:06 gjoseph92

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   7h 1m 15s :stopwatch: + 18m 18s   3 071 tests +  18    2 984 :heavy_check_mark: +  15    85 :zzz: +1  2 :x: +2  22 729 runs  +144  21 740 :heavy_check_mark: +137  986 :zzz: +4  3 :x: +3 

For more details on these failures, see this check.

Results for commit 093d7dcb. ± Comparison against base commit 817ead3a.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Jun 23 '22 02:06 github-actions[bot]

This community issue is likely impacted by this PR: https://dask.discourse.group/t/slow-down-transfer-between-workers/1012/2

crusaderky avatar Aug 24 '22 11:08 crusaderky

@fjetter I've substantially refactored decide_worker and a lot of the transitions around it, hopefully addressing https://github.com/dask/distributed/pull/6614#pullrequestreview-1081757348.

The downside is that this makes the diff larger and even harder to read now. It might? be easier to just look at that commit and its message, though I'm not sure: d47e80dc359b6edcd17eb2686570b6582075f7f2.

The major changes are:

  • decide_worker is split into decide_worker_rootish_queuing_disabled, decide_worker_rootish_queuing_enabled, and decide_worker_non_rootish, which are the three cases that used to be all in that one function.
  • Thanks to that, decide_worker no longer makes recommendations. It just returns a worker, or None. Depending on which of those methods is used, the caller is responsible for recommending the key to queued or no-worker.
  • All tasks that are "ready" (deps in memory) should only be transitioned to processing. That transition (primarily transition_waiting_processing) will then decide which decide_worker_* method to use on them, possibly sending them to a worker if one works, otherwise sending them to queued or no-worker appropriately.
    • I thought about adding a ready state to act as this dispatch. It might make things a little easier to understand, but the decision about whether or not a task could go ready->processing comes down to whether decide_worker_* returns None or not. The problem is that, if it doesn't return None, we don't have a way to pass that WorkerState object on as an argument to a transition_ready_processing function, and I definitely don't want to run decide_worker twice.

      Spending more time with it though, I think I actually like using processing as the dispatch. I'm pretty happy with this structure.

  • I had to refactor the no-worker state a little. Now, both queued and no-worker are "ready" states: their tasks are not waiting on any dependencies, they're just waiting on workers. Therefore, they're both reachable only via transition_waiting_processing. And once tasks are in those states, they don't have to go back through waiting—they can go straight to processing as soon as a worker becomes available.

gjoseph92 avatar Aug 24 '22 23:08 gjoseph92

The fact that I could entirely remove transition_no_worker_memory without any tests failing is... interesting?

Its current existence implies I should also write transition_queued_memory, but I'd rather not since it doesn't seem like either of those transitions should actually be possible. The only ways I can imagine this happening:

  • A client.scatter within a task, using a key that's currently in no-worker
  • An unexpected worker completed task situation. But since a no-worker task should not be running in the first place, and we no longer allow worker reconnection, I don't see how this should be possible. If a worker exists that could run it, it should be in processing, not no-worker. So how could we have an open connection to an eligible worker, but not have it running there? My guess would be https://github.com/dask/distributed/issues/6390

With https://github.com/dask/distributed/pull/6946 / https://github.com/dask/distributed/issues/6874 and offline discussion today, seems like maybe making stimulus_task_finished ignore task completions from workers besides ts.processing_on might be broadly valuable https://github.com/dask/distributed/pull/6884#issuecomment-1217824082?

gjoseph92 avatar Aug 25 '22 00:08 gjoseph92

Thanks @gjoseph92 ! I'll have a look asap. I suggest to keep the refactoring split off in the commit for now and we can decide if this is rather a follow up PR or if we want to keep it like this.

The fact that I could entirely remove transition_no_worker_memory without any tests failing is... interesting?

interesting, but not that surprising. The only way I see this happening is by scattering a key that also has resource or host constraints, e.g.

with Scheduler() as s:
    with Worker(s) as w:
        with Client(s) as c:
            c.submit(func, key='foo', ressources={"A": 1})
            sleep(1)
            c.scatter({"foo": "bar"})

I think that's a rather questionable application. To protect from this we'd need to forbid overriding already known keys by scatter. I suggest to leave it as is / file an issue and move on.

fjetter avatar Aug 25 '22 13:08 fjetter

Not going to add transition_queued_memory; instead, I think we should remove transition_no_worker_memory in a separate PR https://github.com/dask/distributed/issues/6960.

gjoseph92 avatar Aug 25 '22 18:08 gjoseph92

Weird that test_graph_execution_width has failed once on macOS:

>       assert max(Refcount.log) == s.total_nthreads
E       AssertionError: assert 6 == 8

I'm tempted to switch the == to <=, but something also seems wrong if the scheduler is under-saturating workers.

It makes me wonder if the macOS machines are running so slowly, they're not able to saturate all 8 threads? (Probably they don't even have 8 cores?) If the worker is really, really slow to start the next root task, it's possible the data-consuming downstream task (which will start first, since it's higher priority) could complete before the root task is even able to allocate a new Refcount object. This test is making a timing assumption that tasks in both threads start and finish at approximately the same time. If one thread was effectively blocked on the other thread, you could imagine getting less than maximum parallelism.

Additionally, the logs have both Event loop was unresponsive in Scheduler for 7.33s (repeated for all Workers too) and full garbage collections took 28% CPU time recently (threshold: 10%) (10min after the scheduler closed?!).

So I think changing to <= is reasonable, but I also don't like the possibility that opens up to accidentally under-saturate workers and have tests still pass.

gjoseph92 avatar Aug 30 '22 01:08 gjoseph92

I'm leaving the review of the transition machinery and tests to @fjetter. Besides rounding, my perplexity entirely revolves around the is_rootish method.

  1. Why do we need it to begin with? What would happen if you just short-circuited it to return True? I understand that you'd get worker-saturation * nthreads processing tasks per worker at any given time. From the worker's perspective, it would cap the length of the ready heap to (worker-saturation - 1) * nthreads tasks. Wouldn't it be desirable? What does the extra complexity of differentiating between rootish and non-rootish tasks give us?
  2. There is a comment # TODO short-circuit to True if not ts.dependencies? which seems pretty important to me. Why was it not done? It would mean that actual root tasks would always be throttled, even if the number of chunks is less than the number of threads on the cluster. How would it impact performance?
  3. The method introduces some hardcoded factors which seem pretty important to me (2, 5, 5). Where do they come from? Are they the outcome of performance testing with different numbers?
  4. The numerical thresholds will cause a task group to "click" into rootish state as soon as you reduce the chunk size, or increase the array/dataframe size, or reduce the cluster size, through a very specific threshold which is very hard to spot - I might say virtually impossible, if you aren't a power user which is aware of this setting - and is heavily dependent on the interaction between user application and infrastructure. Which circles back to point 1 - do we actually need thresholds to begin with?

[EDIT] short-circuiting the method to return True would also mean that the worker would be a lot less eager to gather dependencies for tasks that would end up nowhere near the top of the ready queue; I suspect this would have massive benefits memory wise in some workloads.

crusaderky avatar Aug 30 '22 12:08 crusaderky

Why do we need it to begin with?

That was my suggestion and is a prerequisite to allow breaking up decide_worker into multiple pieces. See also https://github.com/dask/distributed/issues/6922 for a discussion about how this could be utilized in the future.

There is a comment # TODO short-circuit to True if not ts.dependencies? which seems pretty important to me.

see also https://github.com/dask/distributed/pull/6974

The method introduces some hardcoded factors which seem pretty important to me (2, 5, 5). Where do they come from? Are they the outcome of performance testing with different numbers?

These are not new. See https://github.com/dask/distributed/pull/4967 for some history


@crusaderky I share your concern about the classification logic of root tasks. That's been developed last year and I would like to make this more solid (https://github.com/dask/distributed/issues/6922). I would suggest to split off this conversation from the actual change proposed in this PR

fjetter avatar Aug 30 '22 12:08 fjetter

I agree we should aim to get this in main quickly and then further iterate. I can see two avenues:

  1. get it in as-is (post minor tweaks), and then run performance benchmarks vs a branch where is_rootish simply returns true. This potentially means getting in main a lot of code to then remove it shortly afterwards.
  2. run perf benchmarks now, before merging, to prove that the is_rootish heuristic is indeed needed, albeit it may be tweaked in the future. To clarify I don't propose to benchmark wildly different tweaks to is_rootish; I would just like a battery of tests with different use cases showing
  • main
  • this PR with worker-saturation: inf (no regression vs main - just for safety)
  • this PR with worker-saturation: 1.2
  • this PR with worker-saturation: 1.2, but with return True at the top of is_rootish

crusaderky avatar Aug 30 '22 13:08 crusaderky

@fjetter I believe all comments have been addressed.

For tests, I went with just test_graph_execution_width. I removed the process memory test. I liked the simplicity of your test suggestion, but test_graph_execution_width is slightly more thorough towards one edge case.

gjoseph92 avatar Aug 31 '22 06:08 gjoseph92

I agree we should aim to get this in main quickly and then further iterate. I can see two avenues:

  1. get it in as-is (post minor tweaks), and then run performance benchmarks vs a branch where is_rootish simply returns true. This potentially means getting in main a lot of code to then remove it shortly afterwards.
  2. run perf benchmarks now, before merging, to prove that the is_rootish heuristic is indeed needed, albeit it may be tweaked in the future. To clarify I don't propose to benchmark wildly different tweaks to is_rootish; I would just like a battery of tests with different use cases showing
  • main
  • this PR with worker-saturation: inf (no regression vs main - just for safety)
  • this PR with worker-saturation: 1.2
  • this PR with worker-saturation: 1.2, but with return True at the top of is_rootish

We discussed this early on, before we even started implementation. We agreed to merge this behind the feature flag since this will not change the behavior compared to main. The goal is to set a default parameter for this value asap by running benchmarks. If we are not happy with the performance or cannot find a value that is a sane default, we may even rip this entire thing out again.

fjetter avatar Aug 31 '22 10:08 fjetter

@gjoseph92 (and everyone else involved here), thank you! How do I test it out ;)?

with dask.config.set({"distributed.scheduler.worker-saturation": 1.5}):
    result.compute()

Is this right? What range of values should I provide: inf as an upper-bound is not very useful.

dcherian avatar Sep 15 '22 15:09 dcherian

There are many different ways to set dask config, and depending on how your clusters are deployed (local vs dask-gateway/pangeo vs dask-cloudprovider vs coiled, etc.), the way to set that config will vary.

Often though, the easiest can be to set an environment variable on the cluster. (Pangeo / dask-gateway docs, coiled docs, saturn docs, dask-cloudprovider seems to support env_vars=.) Note that the variable needs to be set before the scheduler starts—once the scheduler has started, setting it will have no effect.

$ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler

For a local cluster, when creating your cluster, you can just do:

with dask.config.set({"distributed.scheduler.worker-saturation": 1.0}):
    client = distributed.Client(n_workers=..., threads_per_worker=...)

If you can't get the config to work, it's possible to change the setting on a live cluster. You could also use this to try different settings without re-creating the cluster. Only run this while the scheduler is idle (no tasks). Otherwise, you'll probably break your cluster.

# enable queuing (new behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.0))

# disable queuing (old behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", float("inf")))

What range of values should I provide: inf as an upper-bound is not very useful

I would try the 1.0 - 2.0 range. I would expect 1.0 to usually be what you want. We are doing some benchmarking, and hopefully will figure out what a good value is across the board, and remove the need/ability to set this value in the future.


@dcherian and anyone who tries this, please report back with your findings, regardless of what they are! We would really like to hear how this works on real-world uses.

gjoseph92 avatar Sep 15 '22 17:09 gjoseph92

Just checking in here, @dcherian any luck trying things out? Happy to help out

jrbourbeau avatar Sep 23 '22 19:09 jrbourbeau

Great to see this merged (and exciting to see Deepak's results too)!

Now that we have 2022.9.2 on the LEAP hub (https://github.com/2i2c-org/infrastructure/pull/1769), I'm trying this out there.

Unfortunately I can't seem to set the worker saturation option successfully. :confused:

Setting via the gateway cluster options manager isn't working - if I do this

options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
gc = g.new_cluster(cluster_options=options)

the cluster starts as expected, but if I do this

options.environment = {"MALLOC_TRIM_THRESHOLD_": "0", "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION": 1.1}}
gc = g.new_cluster(cluster_options=options)

then it hangs indefinitely on cluster creation.

I'm not quite sure where or when I'm supposed to run this $ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler - it creates a new scheduler? Is that different to the cluster?

I also tried client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.1)) but when I did the check suggested on the pangeo cloud docs I just get an empty dict back.

TomNicholas avatar Oct 17 '22 20:10 TomNicholas