distributed icon indicating copy to clipboard operation
distributed copied to clipboard

dask not respecting worker resources

Open nmatare opened this issue 5 years ago • 11 comments

I believe this issue is related to this SO post.

I'm finding that, despite resource restrictions, the dask-scheduler will assign keys to worker nodes even when I'm specifying otherwise.

For example:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=0)
cluster.start_worker(ncores=1)  #  resources={'CPU': 1}, no resources to compute

client = Client(cluster.scheduler.address)
fut = client.submit(lambda x=1: x+1, resources={'CPU':1})
client.who_has()  # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1)
client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd
import pandas.util.testing as tm
import os 
files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

# simulate some pipeline where data is read and transformed 
fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1})
client.who_has()  
# now we see that the scheduler has placed the delayed keys 
# onto the worker even though it has no resources to compute.
#  "('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),
#  "('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),
[os.remove(f) for f in files]

Likewise, I'm finding similar behavior with methods like read_parquet.

dask.__version__
'1.2.2'

nmatare avatar Jul 11 '19 03:07 nmatare

@nmatare, thanks for the reproducible report!

I just tried with dask>=2.0 -- is it possible for you to upgrade ?

quasiben avatar Jul 11 '19 21:07 quasiben

Sure. Here's the report with dask 2.1.0.

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=0)
cluster.scale(n=1)  

client = Client(cluster.scheduler.address)
client.scheduler_info()['workers'] #  resources={'CPU': 1}, no resources to compute

fut = client.submit(lambda x=1: x+1, resources={'CPU':1})
client.who_has()  # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1)
client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd
import pandas.util.testing as tm
import os 
files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

# simulate some pipeline where data is read and transformed 
fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1})
client.who_has()  
# now we see that the scheduler has placed the delayed keys 
# onto the worker even though it has no resources to compute.
#  "('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),
#  "('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),
[os.remove(f) for f in files]
import dask
dask.__version__
# 2.1.0

Getting the same results as with version '1.2.2'

nmatare avatar Jul 11 '19 21:07 nmatare

The resources keyword only affects the final result tasks by default. There isn't a great way to restrict the entire computation today.

On Thu, Jul 11, 2019 at 4:51 PM Nathan Matare [email protected] wrote:

Sure. Here's the report with dask 2.1.0.

from dask.distributed import LocalCluster, Client cluster = LocalCluster(n_workers=0) cluster.scale(n=1)

client = Client(cluster.scheduler.address) client.scheduler_info()['workers'] # resources={'CPU': 1}, no resources to compute

fut = client.submit(lambda x=1: x+1, resources={'CPU':1}) client.who_has() # here we expect to see nothing as no workers have resources

fut2 = client.submit(lambda x=1: x+1) client.who_has() # yep, assigned because it doesn't need resources

import dask.dataframe as dd import pandas.util.testing as tm import os files = dd.from_pandas(tm.makeTimeSeries(1000, freq='10ms'), npartitions=4).to_csv('example*.csv')

simulate some pipeline where data is read and transformed

fut3 = client.compute(dd.read_csv('example*.csv').to_delayed(), resources={'CPU':1}) client.who_has()

now we see that the scheduler has placed the delayed keys

onto the worker even though it has no resources to compute.

"('from-delayed-238ec9c6404d8e52399becf66971834c', 2)": ('tcp://127.0.0.1:44429',),

"('pandas_read_text-read-block-from-delayed-238ec9c6404d8e52399becf66971834c', 2)": (),

[os.remove(f) for f in files]

import dask dask.version

2.1.0

Getting the same results as with version '1.2.2'

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2832?email_source=notifications&email_token=AACKZTGOO76RVHOI2OFBQ6TP66TPJA5CNFSM4IAOW4RKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODZYCYXQ#issuecomment-510667870, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTHLNQIUJEFOKCNQG5LP66TPJANCNFSM4IAOW4RA .

mrocklin avatar Jul 11 '19 21:07 mrocklin

Hmm. From what we've put together the downstream effects for us result in the below situation:

We have several workers with special hardware and others without. The workers without hardware do processing before transferring their results to the others via a distributed.Sub/Pub channel. When we trigger computation, the scheduler loads some keys onto the nodes with hardware. (Which we would not like to have happen) We've got other, non-python code running on the nodes with hardware, resulting in the memory on these workers quickly filling up. The keys that are then loaded onto these nodes get stuck on the workers as the workers pause with high memory and stop accepting new tasks. This ends up bricking the cluster as the other workers can't access the keys to finish their computation. We could increase memory capacity on these nodes to ensure we don't hit limits, but it's an inefficient use of our resources.

I imagine that sub-setting the cluster with workers= would produce a similar results? Are resources tracked at the hidden attribute level (i.e., __dask_keys__) Could I inject the resources into each task before triggering the final call to compute?

nmatare avatar Jul 11 '19 22:07 nmatare

I'm a bit confused, I have a similar issue but not sure if it's identical.

My pipline is roughly:

  (read_parquet)       (some_delayed_func)
        |                      |
 (to_delayed-1..n)             |
        |                      |
        \                      |
         ------ [delayed(to_delayed-1, some_delayed_func),
                 delayed(to_delayed-2, some_delayed_func),
                 ...
                 delayed(to_delayed-n, some_delayed_func)]
                               |
                         (from_delayed)
                               |
                          (to_parquet)

I call to_parquet with compute=False and get back the write_metadata Delayed object. I then call it's .compute method with the resources keyword set to a dict, which has a single key which is a tuple of all the delayed tasks in that list before from_delayed (I dug those keys using .__dask_graph__().layers).

It seems I'm setting the right keys because looking the task in the dashboard the resources are correctly listed: image

However, the scheduler simply over-assigns these tasks (!!) image

Is this the same issue as described here? Can I somehow constrain those intermediate tasks so that no more than 1 of them are assigned to a single worker?

syagev avatar Jan 15 '20 12:01 syagev

@syagev how many processes do you have per worker?

And we'd be grateful if you're able to provide a reproducible example of these resources not being honored.

TomAugspurger avatar Jan 15 '20 12:01 TomAugspurger

The cluster is spawned this way:

LocalCluster(processes=True, threads_per_worker=1, resources={'core': 1},
             local_directory=tempfile.gettempdir(), scheduler_port=8786)

But just to make sure I understand - there shouldn't be a case where a worker shows "Consumed" > "Total" right?

I'll do my best to generate a repro. Thanks!

syagev avatar Jan 15 '20 12:01 syagev

Thanks, that setup looks correct.

there shouldn't be a case where a worker shows "Consumed" > "Total" right?

That seems logical to me, but I've never used resources personally :)

TomAugspurger avatar Jan 15 '20 13:01 TomAugspurger

I've also started seeing the same behavior as @syagev (starting maybe within the last month or so?): image Our workflow has been running into this case fairly reliably but I'm not quite sure how to come up with a reproducible example (ours is running in k8s w/ many other tasks in flight at the same time). I guess it's possible that the dashboard information was incorrect before also, but as of recently workers that show overprovisioned resources like this deadlock and never finish any of them.

bnaul avatar Jan 26 '20 01:01 bnaul

I did checked with htop and resources are respected in my case, even though worker's "Processing" tab displays multiples "ongoing task", only the top one task in the worker task list is actually being processed.

I have Consumed > Total as well. I think it is normal, as consumed tasks amounts for all tasks that have moved through the worker:

  • Some may have moved to other worker, so "Consumed" > "Finished task"
  • There can be more "Processing" tasks than there are "Consumed".
  • Total == "Most at a single time". Still a bit confusing to me though, this is my interpretation.

nuKs avatar Apr 21 '21 22:04 nuKs

My experience here is the same as above. Consumed >total, but if you look at the call stacks, only the top task is actually being computed, which is good news.

image image

My conclusion is that resources behavior is acting properly, but that the 'consumed' label is just a bit confusing. My intuition was that dask would label the other tasks as 'waiting' and not 'processing', because all resources are occupied. Hope this helps others.

bw4sz avatar Aug 05 '22 20:08 bw4sz