distributed
distributed copied to clipboard
map_reduce with scattered dictionary of arrays
What happened: When running map_partitions on a dataframe. The applied function use a model that was scattered as a dictionary with different (x-)arrays. The values in the model ends up being strings with the same value as the key, instead of the array.
What you expected to happen: The model should be unchanged.
Minimal Complete Verifiable Example:
import time
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
def f(data, model):
if hasattr(model, 'result'):
model = model.result()
A = model[0]
# Here I expect A to be an array.
assert not isinstance(A, str), 'Fails here'
S = data.copy()
return S
class Dummy:
# Can be accessed in the same way as the dictionary, but does
# not fail.
def __init__(self, data):
self.data = data
def __getitem__(self, x):
return self.data[x]
def main(work_around):
# Also fails with more workers.
client = Client(n_workers=1)
arr = np.random.random((2,3))
model = {0: arr, 1: arr}
if work_around:
model = Dummy(model)
model = client.scatter(model)
data = np.random.random((1000, 2))
data = pd.DataFrame(data, columns=['A', 'B'])
data = dd.from_pandas(data, npartitions=100)
output = data.map_partitions(f, model)
output.compute()
if __name__ == '__main__':
main(True)
print('Finished with the workaround.')
main(False)
Anything else we need to know?: If instead of passing a dictionary, one pass an object with the same data, it works.
Environment:
- Dask version: 2022.9.1
- Python version: 3.10.6
- Operating System: Linux
- Install method (conda, pip, source): conda