dask-cuda
dask-cuda copied to clipboard
[ENH] Smarter portable defaults for CPU<>GPU dask interop
This got complicated enough that wanted to move from Slack --
Configuring and using dask cpu with dask gpu has been confusing, and is a reality both wrt hw (GPU hw is generally balanced with CPU core counts) and sw (a lot of code isn't GPU, despite our efforts ;-) ). It'd be good to get clarity in docs + defaults here:
Automating nthreads for cpu vs. gpu
We want a simple and portable config for our typical GPU case of launching on a single node with 1-8 GPUs and 4-128 cores, and write size-agnostic dask + dask-cuda code over it. That means never hard-coding the above in the config nor app and still getting reasonable resource utilization. I suspect this is true of most user. It'd also be good to predictably override this. I can't speak to multi-node clusters nor heterogenous ones.
-
Ideally, Psuedo-coding,
dask-scheduler 123 & dask-cuda-worker dask-scheduler:123 &
should "just work"- Or explicit recommendation to do
dask-scheduler 123 & dask-cuda-worker dask-scheduler:123 & dask-worker dask-scheduler:123
- Or explicit recommendation to do
-
Code should just work (~balanced) for things like
dgdf = ddf.map_partitions(cudf.to_pandas)
, and the reverse -
Users can optionally override the defaults at the level of worker config or tasks
"Just working" means:
-
number of GPU workers (threads or processes) should default to ~= # visible cuda devices <- current
dask-cuda-worker
behavior -
number of CPU workers (threads) should default to ~= # cores <
dask-worker
's default, but notdask-cuda-worker
's -
Behind the scenes, if this turns into an equiv of
dask-cuda-worker-existing && dask-worker
, or that's the recommended invocation, so be it. However, I am a bit worried there may be unforeseen buggy / awkward behavior here, like having to use Dataset Publish instead map_partitions for the interop
Automating resource tagging
Separately, here's experimental discussion of using logical resources & annotations, though PyTorch experiences may not carry over to Dask-CPU <> Dask-CUDF. One idea is autotag GPU/CPU workers with # Physical vs # Logical units, and letting app devs use those.
Ex:
8 x A100 $ dask-cuda-worker
# => {'GPU_NODES': 1, 'GPUS_PHYSICAL': 8, 'GPUS_LOGICAL': 8000}
# 'CPU_NODES': 1, 'CPUS_PHYSICAL': 128, 'CPUS_LOGICAL': 128000}
From there, code can use annotations based on hard-coded physical or more scale-free / agnostic logical styles
@lmeyerov thanks for moving the discussion here. If you have time, I think devs would also appreciate some motivating use cases or your thoughts on where mixing GPU/CPU workers is applicable. Pseudo would also be good
Some examples we see:
-
Data ingestion: We get some annoying formats like excel files and more niche ones that we do dask jobs to turn into dataframes, and then send to dask-cudf for actual use
-
Data cleaning: We do some cleaning/enrichment tasks that end up being cpu sometimes, and can be tricky to turn into dgdf b/c the whole point is to infer the metadata, and dgdf assumes it is known
-
RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps
-
We basically do interactive data dashboards, so viewers will load data of different size and every interaction kicks off pipelines of tasks on them (filter, histogram, ML, ...). We're now switching to default-dask just to keep the web server responsive. Tasks can technically be sized for pandas vs cudf vs dask-cudf, but that can be a lot of work, so we often just run as dask-cudf with 1 partition, and maybe the kernel is cudf. (We mentally put bsql in the dask bucket of writing any-scale).
Maybe also helpful are some code patterns:
-
We have a pretty gnarly and somewhat buggy
table_to_dgdf :: Client * DataFrameLike -> DaskCudfDataFrame
convertor that handles some funny cases:- usual: pandas, arrow
- blazingsql: needs a sync client, while our app is written async
- dask: when we used LocalClusters, this was getting confused (maybe also b/c bsql) so we had to roundtrip through Shared Data, though I think that went away with a shared remote dask-cuda-worker
-
we run some sort of asyncio web framework, so pretty early in a handling a response, we generate a Client for the authorized user (1-10ms?). Some reason bsql contexts are even heavier to start so we experimented with caching those. We actually carry around a gpu_client and cpu_client as part of the context as we thought we needed separate ones, but may be getting rid of that.
-
a lot of our requests are bursty and cacheable, so we had some basic LRU logic for cudf. we're investigating a good pattern for that for dask-cudf. imagine loading a single dashboard that makes some histograms over the same overall data
-
/dataset/123/column/345/histogram
-
/dataset/123/column/345/histogram?filter=...
-
/dataset/123/column/678/histogram
This is currently awkward in dask in general. If memory is idling, we should cache, and allow it to be forced out. The policy is probably LRU. For sanity, ok for (us) to auto-expire every 10min. dask has the combo of 'persist' -> 'publishing' for saving between requests, and cudf has universal memory.... but still need to figure out a basic LRU on top. In cudf, we risk not spilling and just decorate those requests with
@lru
... but time to do it right. -
I don't have recommendations yet (first try attempting this). Naively I setup a cluster in the following manner:
Scheduler
dask-scheduler
CPU Workers
dask-worker tcp://...:8786 --nprocs 1 --resources CPU=1.0 --name cpu-worker
GPU Workers
dask-cuda-worker tcp://...:8786 --resources GPU=1 --name gpu-worker
Client
import dask
import cudf
from dask.distributed import Client, wait
client = Client('localhost:8786')
with dask.annotate(resources={'CPU': 1}):
ddf = dask.datasets.timeseries()
ddf = ddf.persist()
wait(ddf)
assert isinstance(ddf, dask.dataframe.core.DataFrame)
with dask.annotate(resources={'GPU': 1}):
cdf = ddf.map_partitions(cudf.DataFrame)
cdf = cdf.persist()
assert isinstance(cdf, dask_cudf.core.DataFrame)
The above worked but I can see this being brittle in that missing an annotation can to lead to problems. I agree that automatic resource tagging for dask-cuda-workers
could make sense especially in these scenarios. For dask-workers
, I don't think we can ask to auto tag workers.
Are you asking for LocalCUDACluster
to optionally spin up additional CPU workers with tags ? I could see this being very convenient but also highly prone mistakes. Probably need to think about this more (cc @pentschev )
I'm also curious what you mean by:
RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps
Do you mean sparse data frames or sparse arrays ? If the latter, CuPy has grown some what recently to meet these needs
Thanks. I was starting to get to that conclusion as the current happy path based on the slack conv. Doing some reworking here to be closer to that, I'm guessing it'll take a few weeks to see how this phase ends up. Been a lot of surprises up to here (ex: diff exn's mixing diff clients to same worker), and a sane dask Resource model that's concurrency friendly.
Re:sparse, sparse Series. Typically dense when indexed by another col. Ex: df['stick']
might be sparse, while df[ df['type'] == 'car' ]['stick']
is dense. Related but a bit more there, esp. when doing ML, we're starting to see many-column datasets. But that's all for another ticket..
Will share feedback as it gets clearer.
Interim update: blazingsql context creation is getting wedged when combining dask-worker --resources CPU=1
and dask-cuda-worker --resources GPU=1
(despite the happy case of single node / single gpu / sequential test). Will help bsql folks track down and once that's unblocked, return here..
I am also curious if this could interact with cluster scaling (and things like dask-gateway). Could each type of worker need to be tracked by some key and scaled independently?
(In my case, I have a workflow with "big disk", "gpu" and "cpu" workers.)
This issue has been labeled inactive-30d
due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d
if there is no activity in the next 60 days.
Hi folks, you might be interested in @madsbk recent work https://github.com/dask/distributed/pull/4869 allowing workers to have multiple executors
This issue has been labeled inactive-90d
due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.
This issue has been labeled inactive-30d
due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d
if there is no activity in the next 60 days.