distributed
distributed copied to clipboard
Serialize some lists using Dispatch
-
Closes #6368
-
Related to: #6940
-
[ ] Tests added / passed
-
[X ] Passes
pre-commit run --all-files
Using the reproducer from @adbreind in #6368 as a starting point for:
from distributed import Client, LocalCluster
from time import time
import numpy as np
def run_test():
runtime = []
def foo():
return [1.5] * 1_000_000
# with LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='8GiB') as cluster:
for i in range(5):
with Client() as client:
s = time()
res = client.submit(foo).result()
runtime.append(time() - s)
print(f"Run time (in seconds) for 5 runs is: {runtime}, and mean runtime: {np.mean(runtime)} seconds")
if __name__ == "__main__":
run_test()
On current main, I get:
Run time (in seconds) for 5 runs is: [13.804176807403564, 13.784174680709839, 13.835507869720459, 13.706598997116089, 13.749552011489868], and mean runtime: 13.776002073287964 seconds
While on this branch, I get:
Run time (in seconds) for 5 runs is: [0.15462398529052734, 0.1298370361328125, 0.1314990520477295, 0.13030385971069336, 0.12935090065002441], and mean runtime: 0.13512296676635743 seconds
What is happening? When serializing collections, we prefer to use pickle and recurse into the collections, serializing each object in the collection separately. This decision was motivated by Blockwise-IO work as described by @rjzamora. While it makes sense, it also has the unfortunate consequence of making it expensive to serialize collections in general.
Here we create a Dispatch() method for lists that converts a list to a numpy array, which can then be serialized. We add infer_if_recurse_to_serialize_list. Now that lists can be serialized recursively using pickle, or with dask_serialize, we offload the decision about whether to iterate_collection toinfer_if_recurse_to_serialize_list.
We also need to handle the case where a Serialize object must itself be serialized. To handle this, we add an iterate_collection attribute.
Can one of the admins verify this patch?
Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.
Unit Test Results
See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.
15 files ± 0 15 suites ±0 6h 20m 2s :stopwatch: + 4m 47s 3 117 tests + 16 3 031 :heavy_check_mark: + 17 84 :zzz: - 1 2 :x: ±0 23 066 runs +112 22 163 :heavy_check_mark: +115 899 :zzz: - 5 4 :x: +2
For more details on these failures, see this check.
Results for commit 886108db. ± Comparison against base commit 1fd07f03.
:recycle: This comment has been updated with latest results.
cc: @madsbk -- Wondering if you would mind taking a look at this PR. Also interested in your thoughts on dropping the requirement to use pickle with protocol=4. xref : https://github.com/rapidsai/dask-cuda/issues/746
cc: @ian-r-rose
@madsbk -- Can we drop the requirement to use protocol=4 now that distributed requires python>=3.8?
@madsbk -- Can we drop the requirement to use
protocol=4now thatdistributedrequirespython>=3.8?
Yes