distributed
distributed copied to clipboard
The buffer of embedded numpy variables is deep-copied in client->scheduler comms
import distributed
import numpy as np
if __name__ == "__main__":
with distributed.Client(n_workers=2, processes=False, protocol="tcp") as client:
a, b = client.has_what()
x = client.submit(np.random.random, 1024, key="x", workers=[a])
y = client.submit(lambda x: x, x, key="y", workers=[b])
y.result()
When the buffer reaches distributed.protocol.serialize.pickle_loads, buffers[0] is a bytes object.
This causes pickle_loads to deep-copy the buffer in order to honour the writeable flag of the original.
To verify, add at the top of pickle_loads:
print(header["writeable"], [ensure_memoryview(b).readonly for b in buffers])
What's causing me a migraine is:
- if you replace
x = client.submit(np.random.random, 1024, key="x", workers=[a])
with
x = client.submit(lambda: np.random.random(1024), key="x", workers=[a])
then the numpy object is no longer deserialized by distributed.protocol.serialize.pickle_loads, but it's instead processed by distributed.protocol.numpy.deserialize_numpy_array, which receives a writeable buffer
- if you replace the submit API with dask.array:
x = da.random.random(1024).persist(workers=[a])
y = x.map_blocks(lambda x: x).persist(workers=[b])
y.compute()
then we are using again distributed.protocol.serialize.pickle_loads, which receives a read-only buffer but this time the writeable flag is False so no deep copy happens.
Ok, found the difference. the deep-copy is NOT tripped on the transfer of the task output from a to b; it's the random seed that's sent from the client to scheduler. :facepalm:
This issue applies to all embedded variables that are sent from client to scheduler, e.g.
x = client.submit(lambda x: x + 1, np.random.random(1024), key="x", workers=[a])