distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Serialize some lists using Dispatch

Open hayesgb opened this issue 3 years ago • 6 comments

  • Closes #6368

  • Related to: #6940

  • [ ] Tests added / passed

  • [X ] Passes pre-commit run --all-files

Using the reproducer from @adbreind in #6368 as a starting point for:

from distributed import Client, LocalCluster
from time import time
import numpy as np


def run_test():
    runtime = []
    def foo():
        return [1.5] * 1_000_000

    # with LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='8GiB') as cluster:
    for i in range(5):
        with Client() as client:
            s = time()
            res = client.submit(foo).result()
            runtime.append(time() - s)
    print(f"Run time (in seconds) for 5 runs is: {runtime}, and mean runtime:  {np.mean(runtime)} seconds")

if __name__ == "__main__":
    run_test()

On current main, I get: Run time (in seconds) for 5 runs is: [13.804176807403564, 13.784174680709839, 13.835507869720459, 13.706598997116089, 13.749552011489868], and mean runtime: 13.776002073287964 seconds

While on this branch, I get: Run time (in seconds) for 5 runs is: [0.15462398529052734, 0.1298370361328125, 0.1314990520477295, 0.13030385971069336, 0.12935090065002441], and mean runtime: 0.13512296676635743 seconds

What is happening? When serializing collections, we prefer to use pickle and recurse into the collections, serializing each object in the collection separately. This decision was motivated by Blockwise-IO work as described by @rjzamora. While it makes sense, it also has the unfortunate consequence of making it expensive to serialize collections in general.

Here we create a Dispatch() method for lists that converts a list to a numpy array, which can then be serialized. We add infer_if_recurse_to_serialize_list. Now that lists can be serialized recursively using pickle, or with dask_serialize, we offload the decision about whether to iterate_collection toinfer_if_recurse_to_serialize_list.

We also need to handle the case where a Serialize object must itself be serialized. To handle this, we add an iterate_collection attribute.

hayesgb avatar Sep 10 '22 00:09 hayesgb

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

GPUtester avatar Sep 10 '22 00:09 GPUtester

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 20m 2s :stopwatch: + 4m 47s   3 117 tests +  16    3 031 :heavy_check_mark: +  17    84 :zzz:  - 1  2 :x: ±0  23 066 runs  +112  22 163 :heavy_check_mark: +115  899 :zzz:  - 5  4 :x: +2 

For more details on these failures, see this check.

Results for commit 886108db. ± Comparison against base commit 1fd07f03.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Sep 10 '22 01:09 github-actions[bot]

cc: @madsbk -- Wondering if you would mind taking a look at this PR. Also interested in your thoughts on dropping the requirement to use pickle with protocol=4. xref : https://github.com/rapidsai/dask-cuda/issues/746

hayesgb avatar Sep 12 '22 01:09 hayesgb

cc: @ian-r-rose

hayesgb avatar Sep 12 '22 01:09 hayesgb

@madsbk -- Can we drop the requirement to use protocol=4 now that distributed requires python>=3.8?

hayesgb avatar Sep 14 '22 18:09 hayesgb

@madsbk -- Can we drop the requirement to use protocol=4 now that distributed requires python>=3.8?

Yes

madsbk avatar Sep 15 '22 06:09 madsbk