dask-cuda
dask-cuda copied to clipboard
Import failure in worker process
This came up as I was playing around with https://github.com/rapidsai/dask-cuda/blob/branch-0.16/dask_cuda/benchmarks/local_cudf_merge.py. I started seeing this import failure:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 335, in deserialize
return loads(header, frames)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 71, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x, buffers=buffers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py", line 16, in <module>
from cudf import _lib as libcudf
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/__init__.py", line 11, in <module>
from cudf import core, datasets, testing
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/__init__.py", line 4, in <module>
from cudf.core.dataframe import DataFrame, from_pandas, merge
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/dataframe.py", line 36, in <module>
from cudf.core.series import Series
ImportError: cannot import name 'Series' from partially initialized module 'cudf.core.series' (most likely due to a circular import) (/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py)
distributed.utils - ERROR - cannot import name 'Series' from partially initialized module 'cudf.core.series' (most likely due to a circular import) (/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py)
I have simplified the reproducer down to https://gist.github.com/manopapad/aad22af7ec2206c7f078b71dc042ed56.
I suspect this has to do with doing imports inside a function that constructs dask graphs (e.g. here https://github.com/rapidsai/dask-cuda/blob/branch-0.16/dask_cuda/benchmarks/local_cudf_merge.py#L26-L32), then shipping those graphs to worker processes that haven't executed those imports. Just a hunch though, someone familiar with the internals of dask would know better.
I think your explanation for the issue is right. However, I haven't seen that happening before, what arguments do you pass to the local_cudf_merge.py script to reproduce this?
This came up as I was making changes to local_cudf_merge.py, here is the diff that triggers the issue:
--- ./pkgs/dask-cuda-0.16.0a201013-py38_96/lib/python3.8/site-packages/dask_cuda/benchmarks/local_cudf_merge.py 2020-10-13 00:45:17.000000000 -0700
+++ ./envs/legate3/lib/python3.8/site-packages/dask_cuda/benchmarks/local_cudf_merge.py 2020-10-17 00:34:12.575661164 -0700
@@ -161,10 +161,12 @@
# Generate random Dask dataframes
ddf_base = get_random_ddf(
args.chunk_size, n_workers, args.frac_match, "build", args
- ).persist()
+ )
+ ddf_base.persist()
ddf_other = get_random_ddf(
args.chunk_size, n_workers, args.frac_match, "other", args
- ).persist()
+ )
+ ddf_other.persist()
wait(ddf_base)
wait(ddf_other)
client.wait_for_workers(n_workers)
With these changes, I can trigger the import error consistently with:
python local_cudf_merge.py -d 0
I understand that the edited code is not equivalent to the original (it is waiting on the wrong thing), but I believe it is still a valid use of the dask API (certainly the shortened version https://gist.github.com/manopapad/aad22af7ec2206c7f078b71dc042ed56 is a valid use), so it shouldn't be crashing like this.
I reported this here because it occurred in an example that uses dask-cuda, but if you believe this is a general issue with dask-distributed I'll be happy to move it over to that bug tracker.
I don't believe this is an issue with dask-cuda, nor that there's anything we can do about it on our side. It seems that this is something during serialization, I'm not sure if this is something that would need to be fixed in distributed or cuDF itself. Adding @jakirkham here who's more familiar than I am with serialization, maybe he'll have a better idea of what should be done.
I'm not sure of the current state things here. It seems like PR ( https://github.com/rapidsai/dask-cuda/pull/416 ) went in earlier making some changes to the benchmarks. Did that address this issue?
No, #416 is unrelated; this is not an issue with the benchmarks as they currently stand. The bottomline is that this snippet:
https://gist.github.com/manopapad/aad22af7ec2206c7f078b71dc042ed56
causes this crash:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 335, in deserialize
return loads(header, frames)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 71, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x, buffers=buffers)
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py", line 16, in <module>
from cudf import _lib as libcudf
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/__init__.py", line 11, in <module>
from cudf import core, datasets, testing
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/__init__.py", line 4, in <module>
from cudf.core.dataframe import DataFrame, from_pandas, merge
File "/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/dataframe.py", line 36, in <module>
from cudf.core.series import Series
ImportError: cannot import name 'Series' from partially initialized module 'cudf.core.series' (most likely due to a circular import) (/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py)
distributed.utils - ERROR - cannot import name 'Series' from partially initialized module 'cudf.core.series' (most likely due to a circular import) (/gpfs/fs1/mpapadakis/miniconda3/envs/legate3/lib/python3.8/site-packages/cudf/core/series.py)
And we're not sure if this actually has anything to do with dask-cuda or cudf, or if it's a pure dask-distributed problem. My money is on this being a race condition between executing tasks on remote workers and importing of prerequisite modules, but I don't know much about the internals of dask-distributed.
Interesting I've not seen a partially initialized module issue before.
What happens if from cudf import Series is added to the top of the script?
Doing from cudf import Series, or even just import cudf at the top of the script fixes the crash. Doing just import cupy at the top the crash remains.
Thanks for checking. Sounds like that may act as a short-term workaround.
As we already import cudf here and Dask already imports that module here, I'm not sure that we are missing any imports.
It is interesting that this script seems to use pickle instead of dask or better yet cuda serialization. Sounds like something might not be run or configured correctly. For instance are we running this script within cuDF or Distributed repos? That might cause us these kinds of import issues.
As to configuration issues, not totally sure what that might be, but I do notice that InfiniBand is enabled. Did you build the ucx package with InfiniBand support? We don't do this by default as this is sensitive to the exact Mellanox OFED version used making it hard to distribute effectively with that enabled. Also it looks like we are using only 1 worker? Wouldn't expect that to be particularly insightful from a benchmarking standpoint. Have we tried 2 or more workers? Also one doesn't typically need to set CUDA_VISIBLE_DEVICES as was done here since Dask-CUDA does this for us. Would try dropping that. There may be other issues I'm overlooking (others please feel free to jump in), but that should help get us started.
As to configuration issues, not totally sure what that might be, but I do notice that InfiniBand is enabled. Did you build the
ucxpackage with InfiniBand support? We don't do this by default as this is sensitive to the exact Mellanox OFED version used making it hard to distribute effectively with that enabled. Also it looks like we are using only 1 worker? Wouldn't expect that to be particularly insightful from a benchmarking standpoint. Have we tried 2 or more workers? Also one doesn't typically need to setCUDA_VISIBLE_DEVICESas was done here since Dask-CUDA does this for us. Would try dropping that. There may be other issues I'm overlooking (others please feel free to jump in), but that should help get us started.
I think this was meant to be the simplest possible reproducer. AFAIK, @manopapad has been working with @quasiben on benchmarking, so I trust that they have built and configured UCX properly and we're safe to skip this part of the conversation, but feel free to correct me here.
My point more is based on the error things don't appear to be working as expected. The use of pickle is a symptom of that along with the partial import error. We don't see this in other workflows like TPCx-BB. So I'm left wondering if we have a deployment and/or configuration issue.
I think this is due to a very particular use of import cudf and persist, and that occurs just the same with protocol="tcp", so we can completely remove UCX complexity from this issue to simplify things.
Yeah that would help if we can simplify the reproducer and confirm that.
For instance are we running this script within cuDF or Distributed repos? That might cause us these kinds of import issues.
I'm using a conda installation of rapids 0.16 from rapids-nightly.
I think this was meant to be the simplest possible reproducer.
That's what I was going for. However the crash still happens with protocol=tcp and no further configuration of nworkers:
https://gist.github.com/manopapad/37db3e31e08cc74621c05ab724713f7c
For completeness, here is the full output for this tcp version: https://gist.github.com/manopapad/a0aa012f37a2997a76d3f7f051386f0e and for the original ucx version: https://gist.github.com/manopapad/c3c7f09161f41d3cf24bc70e39334c44
This issue has been marked stale due to no recent activity in the past 30d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be marked rotten if there is no activity in the next 60d.
This issue has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.