dask-memusage icon indicating copy to clipboard operation
dask-memusage copied to clipboard

Add dask_memusage.install introduce "ValueError: Inputs contain futures that were created by another client."

Open CAROLZXYZXY opened this issue 4 years ago • 2 comments

Thank you for the wonderful tool!

I would like to profile peak memory of my dask application. I can run it successfully without dask_memusage. However, after I add memusage.install, it causes "ValueError: Inputs contain futures that were created by another client." I use dask-memusage v1.1, dask-core v2021.3.0.

Attached my chunk of code here:

import dask_memusage
import gc
from utility import get_batch_index
from dask.distributed import Client, LocalCluster
from sklearn.neighbors import NearestNeighbors


CLUSTER_KWARGS = {
    'n_workers': 4,
    'threads_per_worker': 1,
    'processes': False,
    'memory_limit': '8GB',
}

cluster = LocalCluster(**CLUSTER_KWARGS)
dask_memusage.install(cluster.scheduler, 'memory_stats/memusage.csv')

def kNN_graph(X, key_index, ref_index, n_neighbors=10):
    gc.collect()
    nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(X[ref_index[0]:ref_index[1], :])
    distance, indices = nbrs.kneighbors(X[key_index[0]:key_index[1], :])
    return (distance, indices)


contamination = 0.1  # percentage of outliers
n_train = args.n_train  # number of training points
n_test = 1000  # number of testing points
n_features = args.dim
    
# Generate sample data
X_train, y_train, X_test, y_test = \
    generate_data(n_train=n_train,
                  n_test=n_test,
                  n_features=n_features,
                  contamination=contamination,
                  random_state=42)

k = 5
batch_size = 5000
n_samples = n_train


start = time.time()
batch_index = get_batch_index(n_samples=n_samples, batch_size=batch_size)
n_batches = len(batch_index)

# save the intermediate results
full_list = []

# scatter the data
future_X = client.scatter(X_train)

delayed_knn =  delayed(kNN_graph)

for i, index_A in enumerate(batch_index):
    for j, index_B in enumerate(batch_index):
        full_list.append(delayed_knn(future_X, index_A, index_B, k))
        
full_list = dask.compute(full_list)

CAROLZXYZXY avatar Apr 24 '21 21:04 CAROLZXYZXY

Hi,

Sorry I missed this. I notice you have both cluster and client, I suspect you want only one or the other.

itamarst avatar Oct 10 '21 20:10 itamarst

I will try to clarify the documentation with examples for both cases.

itamarst avatar Oct 10 '21 20:10 itamarst