distributed icon indicating copy to clipboard operation
distributed copied to clipboard

map_reduce with scattered dictionary of arrays

Open marberi opened this issue 3 years ago • 0 comments

What happened: When running map_partitions on a dataframe. The applied function use a model that was scattered as a dictionary with different (x-)arrays. The values in the model ends up being strings with the same value as the key, instead of the array.

What you expected to happen: The model should be unchanged.

Minimal Complete Verifiable Example:


import time
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd

def f(data, model):

    if hasattr(model, 'result'):
        model = model.result()

    A = model[0]
    
    # Here I expect A to be an array.
    assert not isinstance(A, str), 'Fails here'

    S = data.copy()
    
    return S
    
class Dummy:
    # Can be accessed in the same way as the dictionary, but does
    # not fail.
    def __init__(self, data):
        self.data = data

    def __getitem__(self, x):
        return self.data[x]

def main(work_around):
    # Also fails with more workers.
    client = Client(n_workers=1)

    arr = np.random.random((2,3))
    model = {0: arr, 1: arr} 
    if work_around:
        model = Dummy(model)

    model = client.scatter(model)
    

    data = np.random.random((1000, 2))
    data = pd.DataFrame(data, columns=['A', 'B'])
    data = dd.from_pandas(data, npartitions=100)
    output = data.map_partitions(f, model)

    output.compute()

if __name__ == '__main__':
    main(True)
    print('Finished with the workaround.')

    main(False)

Anything else we need to know?: If instead of passing a dictionary, one pass an object with the same data, it works.

Environment:

  • Dask version: 2022.9.1
  • Python version: 3.10.6
  • Operating System: Linux
  • Install method (conda, pip, source): conda

marberi avatar Sep 20 '22 08:09 marberi