dask-cuda
dask-cuda copied to clipboard
[BUG] sync client constructor hangs on connecting to an async localcudacuster
Describe the bug
Connecting Client(synchronous=False)
to a LocalCUDACluster(synchronous=True)
hangs. This means an async localcudacluster cannot be used with RAPIDS libraries that expect a sync client, e.g., blazingsql. (cc @felipeblazing @kkraus14 ) .
Demo:
import asyncio, cudf, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
async def main():
async with await LocalCUDACluster(asynchronous=True, dashboard_address=None) as cluster_async:
print('making sync client..') ### last message to get printed
with Client(address=cluster_async, asynchronous=False) as client_sync:
print('exiting client..')
print('exiting cluster..')
print('cleaned up.')
asyncio.run(main())
=>
making sync client..
-------------------------
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f72a2bccf90>>, <Task finished coro=<Worker.heartbeat() done, defined at /conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py:929> exception=OSError('Timed out during handshake while connecting to tcp://127.0.0.1:33281 after 10 s')>)
Traceback (most recent call last):
File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/comm/core.py", line 319, in connect
handshake = await asyncio.wait_for(comm.read(), time_left())
File "/conda/envs/rapids/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
Steps/Code to reproduce bug
See above
Expected behavior
Code to terminate without exceptions
Environment overview (please complete the following information)
ubuntu w/ 10.2 -> docker ubuntu 18 -> conda rapids=17
Additional context
This is about LocalCUDACluster
. When the cluster is started separately (dask-scheduler
/ dask-cuda-worker
+ connecting by address), initial testing makes it look fine to mix sync + async clients, at least in separate processes.
Oops, this should be in dask-cuda
, sorry!
Oops, this should be in
dask-cuda
, sorry!
Trasferred it.
@lmeyerov does the same snippet exhibit different behavior with LocalCluster
as opposed to LocalCUDACluster
?
The context statement is incorrectly stated:
async with await LocalCUDACluster
Should be
async with LocalCUDACluster
I'm not sure that's true of async clusters. Either way, reran with the edit, same thing:
import asyncio, cudf, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
async def main():
async with LocalCUDACluster(asynchronous=True, dashboard_address=None) as cluster_async:
print('making sync client..')
with Client(address=cluster_async, asynchronous=False) as client_sync:
print('exiting client..')
print('exiting cluster..')
print('cleaned up.')
asyncio.run(main())
=>
>>> asyncio.run(main())
making sync client..
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f6171233710>>, <Task finished coro=<Worker.heartbeat() done, defined at /conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py:929> exception=OSError('Timed out during handshake while connecting to tcp://127.0.0.1:42571 after 10 s')>)
Traceback (most recent call last):
File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/comm/core.py", line 319, in connect
handshake = await asyncio.wait_for(comm.read(), time_left())
File "/conda/envs/rapids/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
@kkraus14 It's a known bug in LocalCluster
- I had dug up this one a few days ago, where @mrocklin reported it didn't matter as much for dask's impl of localcluster as less clear utility in CPU land: https://github.com/dask/distributed/issues/3789 (And yes, issue is still open: timeout, instead of working / handled exception)
Can you describe a bit more of the use case you have and why you might need both blazingsql and an additional async client ?
Good news: when using a remote cluster, we can having both sync + async clients hit the same cluster. So this is really scoped down to localcudacluster
not supporting mixed client use.
RE:Use case, sure thing. Modern python + software in general is increasingly async. While dask_cudf supports async scheduler/clients (mostly minor docs / api-inconsistency issues, as dask is already async internally afaict), the problem is bsql currently requires a sync client/cluster, and no roadmap for adding async. (It does allow future returns, but couldn't get that to be reliable, and still requires a sync client for ingest.) As more user libs land on top of rapids, I expect most new immature libs to also be sync. In our case, we're doing an async cudf/dask_cudf pipeline where 1 step is bsql... so it was failing. It's unfortunate that we lose the convenience of using localcudacluster
for the pipeline, but as we knew we needed to go to a central gpu scheduler service at some point anyway, here we are.
We did experiment with stuff like running both a sync + async cuda cluster, but that was too much of a gpu mem hit, and sharing across the pipeline seems problematic..
We're working on a tech demo of what's enabled by this, so happy to contribute a mini-tutorial around the solution.
Edit: This is a likely wontfix, so may be better to address for interim (= next 6mo..?) via docs.
@lmeyerov have you tried creating the LocalCUDACluster
how you normally would and then passing the client the address as opposed to the cluster
object? I.E. something like:
import asyncio, cudf, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
async def main():
async with LocalCUDACluster(asynchronous=True, dashboard_address=None) as cluster_async:
print('making sync client..')
with Client(address=cluster_async.scheduler.address, asynchronous=False) as client_sync:
print('exiting client..')
print('exiting cluster..')
print('cleaned up.')
asyncio.run(main())
Clever! Interestingly, while it still times out, I wonder if it's partly due to running in docker.
I added a printf to the address (print('making sync client for cluster', cluster_async.scheduler.address)
), which gave tcp://127.0.0.1:41789
. Specifying LocalCUDACluster(interface="eth0"...
=> tcp://172.17.0.2:33121
, and "lo"
=> "tcp://127.0.0.1:41789"
. However, both still timed out on client connect.
Our stack now has it (mostly) working via a containerized standalone dask-cuda-worker
(https://github.com/rapidsai/dask-cuda/issues/481#issuecomment-754421287), so we can narrow this down to being about localcudacluster
. If someone can repro in a non-docker env, we can know if it's docker or in general..
This issue has been marked stale due to no recent activity in the past 30d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be marked rotten if there is no activity in the next 60d.
@lmeyerov any updates here ? Were you able to get things working in Docker ?
I don't think so -- we work around this now via a remote dask service, which does support simultaneous async+sync clients
This issue has been labeled inactive-30d
due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d
if there is no activity in the next 60 days.
This issue has been labeled inactive-90d
due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.