dask-memusage
dask-memusage copied to clipboard
Add dask_memusage.install introduce "ValueError: Inputs contain futures that were created by another client."
Thank you for the wonderful tool!
I would like to profile peak memory of my dask application. I can run it successfully without dask_memusage. However, after I add memusage.install, it causes "ValueError: Inputs contain futures that were created by another client." I use dask-memusage v1.1, dask-core v2021.3.0.
Attached my chunk of code here:
import dask_memusage
import gc
from utility import get_batch_index
from dask.distributed import Client, LocalCluster
from sklearn.neighbors import NearestNeighbors
CLUSTER_KWARGS = {
'n_workers': 4,
'threads_per_worker': 1,
'processes': False,
'memory_limit': '8GB',
}
cluster = LocalCluster(**CLUSTER_KWARGS)
dask_memusage.install(cluster.scheduler, 'memory_stats/memusage.csv')
def kNN_graph(X, key_index, ref_index, n_neighbors=10):
gc.collect()
nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(X[ref_index[0]:ref_index[1], :])
distance, indices = nbrs.kneighbors(X[key_index[0]:key_index[1], :])
return (distance, indices)
contamination = 0.1 # percentage of outliers
n_train = args.n_train # number of training points
n_test = 1000 # number of testing points
n_features = args.dim
# Generate sample data
X_train, y_train, X_test, y_test = \
generate_data(n_train=n_train,
n_test=n_test,
n_features=n_features,
contamination=contamination,
random_state=42)
k = 5
batch_size = 5000
n_samples = n_train
start = time.time()
batch_index = get_batch_index(n_samples=n_samples, batch_size=batch_size)
n_batches = len(batch_index)
# save the intermediate results
full_list = []
# scatter the data
future_X = client.scatter(X_train)
delayed_knn = delayed(kNN_graph)
for i, index_A in enumerate(batch_index):
for j, index_B in enumerate(batch_index):
full_list.append(delayed_knn(future_X, index_A, index_B, k))
full_list = dask.compute(full_list)
Hi,
Sorry I missed this. I notice you have both cluster and client, I suspect you want only one or the other.
I will try to clarify the documentation with examples for both cases.