distributed
distributed copied to clipboard
Use Task class instead of tuple
This is an early version that will close https://github.com/dask/dask/issues/9969
It introduces a new Task class (name is subject to change) and a couple of other related subclasses that should replace the tuple as a representation of runnable tasks.
The benefits of this are outlined in https://github.com/dask/dask/issues/9969 but are primarily focused to reduce overhead during serialization and parsing of results. An important result is also that we can trivially cache functions (and arguments if we wish) to avoid problems like https://github.com/dask/distributed/issues/8767 where users are erroneously providing expensive to pickle functions (which also happens frequently in our own code and/or downstream projects like xarray)
This approach allows us to convert the legacy dsk graph to the new representation with full backwards compatibility. Old graphs can be migrated and new ones written directly using this new representation which will ultimately reduce overhead.
I will follow up with measurements shortly.
Sibling PR in dask https://github.com/dask/dask/pull/11248
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
25 files ± 0 25 suites ±0 10h 21m 22s :stopwatch: + 1m 27s 4 123 tests - 9 4 006 :white_check_mark: - 9 110 :zzz: ±0 7 :x: +1 47 622 runs - 109 45 521 :white_check_mark: - 108 2 087 :zzz: - 8 14 :x: +8
For more details on these failures, see this check.
Results for commit 65ed5d5f. ± Comparison against base commit 48509b35.
This pull request removes 11 and adds 2 tests. Note that renamed tests count towards both.
distributed.tests.test_client ‑ test_persist_get
distributed.tests.test_client ‑ test_recreate_error_array
distributed.tests.test_client ‑ test_recreate_error_collection
distributed.tests.test_client ‑ test_recreate_error_delayed
distributed.tests.test_client ‑ test_recreate_error_futures
distributed.tests.test_client ‑ test_recreate_task_array
distributed.tests.test_client ‑ test_recreate_task_collection
distributed.tests.test_client ‑ test_recreate_task_delayed
distributed.tests.test_client ‑ test_recreate_task_futures
distributed.tests.test_utils ‑ test_maybe_complex
…
distributed.tests.test_client ‑ test_persist_get[False]
distributed.tests.test_client ‑ test_persist_get[True]
:recycle: This comment has been updated with latest results.
There may be implications for some of the dashboard components, the "pew pew pew" plot comes to mind. I see this is still a draft, let me know when it's in a reviewable state and I'll look over the dashboard code to see if anything needs changing there 🙂.
I'd actually be surprised if that was affected since we don't change the scheduler internal metadata (like dependencies, transfers, where the tasks are executed...). But who knows. I'll probably stumble over 50 small weird things trying to get CI green :)
I think this is slowly coming together.
I see some kind of unexpected failures still in distributed/shuffle/tests/test_merge.py::test_merge. They indicate some shuffle failed errors that I cannot see on any test report.
The other failures (so far) appear to be unrelated
Test failures are all over the place. I don't think anything is related.
I triggered another A/B just to make sure
https://github.com/coiled/benchmarks/actions/runs/10679723905
The results of the A/B run are interesting. I will copy the test cases with some signal here
There are a couple of notable improvements already. This shows test_swap_axes_in_memory which is using P2P rechunking. P2P rechunking is already using the new task spec so the parsing is much cheaper. We're up to 25% faster
However, there are also a couple of cases for P2P to be now slower by 5-10%. For example
There are also some notable regression around Client.map (already using the new spec)
and some other array code (test_doublediff)
There are some notable gains with test_filter_then_average by up to 15% but these tests have been known to be a little flaky
So overall a bit of a mixed bag. I'm not very discouraged considering that we haven't migrated any other low level code to this system but I will poke at the performance regressions a bit to see if there are some easy wins.
The Client.map perf regression was trivial and should be fixed by now. I only migrated the complex version with kwargs but not the case without. Therefore, we went from "basically doing nothing" to "let's parse this thing once" which was showing up on those tests that basically measured overhead.
I'll have to look into this a little more, I think
I repeated the benchmarks and got
Wall time
Peak Memory (W)
The only thing I'm not surprised about is that Scheduler Memory also increases (which is expected at this stage)
I suspect that the memory increase, particularly for the P2P cases, is caused by the cache building up but I haven't checked. yet. In absolute values, the increase is much less severe as the above is letting believe
So I briefly looked into test_tiles_to_rows (that one with the really bad memory result. I ran CPU and memory profiles but cannot find anything that's wrong with it.
The caches are populated but hold less than 10MB. The memray profile doesn't show anything out of the ordinary. The accumulated memory are the shards we're shuffling and not anything related to the spec itself.
I'll check again what happens if I disable the caching entirely but I can't imagine how this would change anything about the lifetyime of the shards.
I changed CI configuration to run against main again. https://github.com/dask/dask/pull/11248 should be fine to merge and afterwards we'll retrigger CI
needs https://github.com/dask/dask/pull/11429
Also this https://github.com/dask/dask/pull/11431
The mindeps builds are sad. Everything else seems unrelated
Update: test_merge failures seem systemic (and thus related).
Update: test_merge failures seem systemic (and thus related).
I've been trying to reproduce but without any luck so far
I think the test_merge failures are actually unrelated. The exception is
def validate_data(self, data: pd.DataFrame) -> None:
> if set(data.columns) != set(self.meta.columns):
E AttributeError: 'tuple' object has no attribute 'columns'
which indicates that data is likely a key instead of the data... I've seen this before and I think this is a dask-expr problem. I added another verification step here to confirm this. I'll keep digging
yeah, so the exception is pretty much what I expected
if not isinstance(data, pd.DataFrame):
> raise TypeError(f"Expected {data=} to be a DataFrame, got {type(data)}.")
E TypeError: Expected data=('assign-3d7cfa7cea412465799bea6cfac1b512', 1) to be a DataFrame, got <class 'tuple'>.
Ah, this is the build with dask-expr disabled. Now I can reproduce!
distributed/shuffle/tests/test_graph.py::test_multiple_linear failure is also related. also a legacy-only problem
https://github.com/dask/dask/pull/11445/ is hopefully the last one
Is there anything left to do here?