distributed icon indicating copy to clipboard operation
distributed copied to clipboard

The buffer of embedded numpy variables is deep-copied in client->scheduler comms

Open crusaderky opened this issue 1 year ago • 2 comments


import distributed
import numpy as np

if __name__ == "__main__":
    with distributed.Client(n_workers=2, processes=False, protocol="tcp") as client:
        a, b = client.has_what()
        x = client.submit(np.random.random, 1024, key="x", workers=[a])
        y = client.submit(lambda x: x, x, key="y", workers=[b])
        y.result()

When the buffer reaches distributed.protocol.serialize.pickle_loads, buffers[0] is a bytes object. This causes pickle_loads to deep-copy the buffer in order to honour the writeable flag of the original.

To verify, add at the top of pickle_loads:

print(header["writeable"], [ensure_memoryview(b).readonly for b in buffers])

What's causing me a migraine is:

  • if you replace
        x = client.submit(np.random.random, 1024, key="x", workers=[a])

with

        x = client.submit(lambda: np.random.random(1024), key="x", workers=[a])

then the numpy object is no longer deserialized by distributed.protocol.serialize.pickle_loads, but it's instead processed by distributed.protocol.numpy.deserialize_numpy_array, which receives a writeable buffer

  • if you replace the submit API with dask.array:
x = da.random.random(1024).persist(workers=[a])
y = x.map_blocks(lambda x: x).persist(workers=[b])
y.compute()

then we are using again distributed.protocol.serialize.pickle_loads, which receives a read-only buffer but this time the writeable flag is False so no deep copy happens.

crusaderky avatar Apr 03 '24 14:04 crusaderky

Ok, found the difference. the deep-copy is NOT tripped on the transfer of the task output from a to b; it's the random seed that's sent from the client to scheduler. :facepalm:

crusaderky avatar Apr 03 '24 14:04 crusaderky

This issue applies to all embedded variables that are sent from client to scheduler, e.g.

x = client.submit(lambda x: x + 1, np.random.random(1024), key="x", workers=[a])

crusaderky avatar Apr 03 '24 14:04 crusaderky