distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Scheduler behaves badly when adaptively adding workers to meet resource demand

Open mattilyra opened this issue 6 years ago • 29 comments

There is an issue with how the scheduler assigns tasks from the unrannable queue to workers who meet the resource requirements joining the scheduler.

The use case is some long running complex task where some tasks require an expensive resource, say GPUs, but those resources are only provided (through Adaptive) once the tasks requiring those resources are ready to be run. Say we come to a point in the computation where 5 tasks could be run, if GPUs were available. Managing the scale_up behaviour through adaptive is fairly straightforward and allows adding new compute nodes (for instance on AWS) with the required resources. The problem appears when the first of the new workers connects.

Scheduler.addWorker will go through the list of unrunnable tasks and check if there are workers that meet the requirements, since only the first worker has thus far connected there is only one worker that meets the requirements (some possibly positive number of workers may or may not booting up and joining shortly, but that hasn't happened yet).

        for ts in list(self.unrunnable):
            valid = self.valid_workers(ts)
            if valid is True or ws in valid:
                recommendations[ts.key] = 'waiting'

The task goes through released -> waiting and then waiting -> processing, the transition_waiting_processing again calls valid_workers to get a list of workers where the task(s) can be run (this list still just contains a single worker because the other ones haven't yet connected).

The end result of all of this is that the worker who happens connect first and have the resource required by the tasks gets all the tasks dumped onto it with all the other workers, who potentially connect just seconds later, get nothing and are shutdown by the scheduler because they are idling.

In short, it appears to be the case that the purpose of the resource_requirements is to act as a hint of required peak performance (memory, GPU, whatever) from the workers, and not to be a dynamically changing resource allocation. Is this the case, and is there any interested in changing that? The resources available and resources consumed are taken into account in transition_waiting_processing, but only on the worker_state not for the scheduler in general.

If this is not the intended behaviour and should be fixed, I'm more than happy to work on this.

mattilyra avatar Mar 22 '18 16:03 mattilyra

If this is not the intended behaviour and should be fixed, I'm more than happy to work on this.

This is not the intended behaviour and should be fixed. Help would be welcome!

I agree with your understanding of the situation and in general I'm glad to hear from someone who is comfortable working within the scheduler code.

My first instinct on how to solve this problem would be to look at improving work stealing with resources. The first worker should get all of the tasks if its the only one around, but it should also share them once others arrive.

Doing this well is likely to require some thought. The current work stealing system is probably not as clean as it could be. We'll also want to ensure that this doesn't cost scheduler time in the common case when resources are not in use.

mrocklin avatar Mar 22 '18 16:03 mrocklin

That was my first thought as well, so I merged the branch by quakberto (see #1389) with master and tested it.

The problem with relying on work stealing to sort this out is two fold:

  1. The scheduler keeps scaling up and down which keeps the scheduler CPU busy and the work stealing code is rarely, if ever, executed. This is partially related to the custom should_scale_up code I have, which tries to account for resource starvation not only for unrunnable tasks but more generally in the hope that tasks would be moved (stolen) if more machines with the resources were available. The scheduler ends up killing the n-1 new workers because they are idling. I haven't totally cracked why exactly the tasks aren't getting stolen, this is also going to be highly dependent on the actual workload and seems somewhat brittle especially if adaptive is meant to allow people to run highly customised loads.

  2. The work stealing only happens once the scheduler gets some info back from the workers regarding the running times (or occupancy) of the tasks. For tasks that take some time to run, say training ML models, this lag will be long enough for the scheduler to again get rid of the workers before they get a chance to steal anything. The changes from quakberto are still required but they're not enough on their own.


My suggestion would be to instead change Scheduler.valid_workers to do virtual accounting of resources required/supplied. Virtual because the resources haven't actually been taken yet, there is just a proposal to do so in the future. Additionally Scheduler.consume_resources and Scheduler.release_resources would need to modified to account for the resources available in the cluster (self.resources).

I've started a branch (like 5 seconds ago) with these changes and am currently testing it (https://github.com/mattilyra/distributed/tree/resource_accounting_for_valid_workers).

mattilyra avatar Mar 22 '18 17:03 mattilyra

The scheduler keeps scaling up and down which keeps the scheduler CPU busy and the work stealing code is rarely, if ever, executed

This is a problem all on its own. The scale_up and scale_down code should either be very fast, or we should execute them in a separate thread. Blocking the scheduler is generally not recommended.

The work stealing only happens once the scheduler gets some info back from the workers regarding the running times

I'm inclined to handle this separately. Maybe we make it easy for schedulers to persist this information between sessions.

mrocklin avatar Mar 22 '18 17:03 mrocklin

In general I think it would be better to try to fix this to make the scheduler be aware of the resource constraints in the first place and act fairly, or at least as fairly as it can instead of dumping all work on one node and then relying others to maybe steal it in the future.

Keeping tasks in the unrunnable state while no resources are available to be actively used seems like a reasonable thing to do.

mattilyra avatar Mar 22 '18 17:03 mattilyra

The scheduler used to operate like this, keeping tasks that could be run but weren't in the scheduler until some worker was available that had resources to run them soon. We switched away from this in an effort to improve performance. It turns out to be useful to give the workers plenty to do so that they can start communicating for dependencies earlier, have a backlog of tasks to push in if other tasks take a while to get to it etc.. This was generally a success and so we've thrown out all of the old machinery to keep tasks in the scheduler. The unrunnable collection is a very small version of this that's still around. I don't think that we should rely on it in this way. For example, you might have to check on all of unrunnable every time a task completed rather than every time a new worker was added. This might affect our constant-time guarantees.

mrocklin avatar Mar 22 '18 17:03 mrocklin

For example, you might have to check on all of unrunnable every time a task completed rather than every time a new worker was added. This might affect our constant-time guarantees.

Good point.

Is there anything in the scheduler that keeps track of workers launched but not yet connected? That could perhaps be used as an indicator to hold back on pushing all the work onto the first worker that connects.

mattilyra avatar Mar 22 '18 17:03 mattilyra

I agree that this would help with planning but no, the scheduler has no such information. Actually the scheduler has no knowledge of how to launch workers. This tends to be handled by a Cluster object.

mrocklin avatar Mar 22 '18 17:03 mrocklin

It occurs to me that if the scheduler was given knowledge of the target number of instances (e.g. through cluster.scale_up telling it), then it could allocate tasks to putative instances before they are created; then when they join they the details of the assigned tasks could be given to them. Of course, in the meantime their tasks should remain stealable.

rbubley avatar Mar 22 '18 20:03 rbubley

This needs to work even if people are giving us resources arbitrarily on their own. Generaly Dask survives difficult situations by reacting intelligently to events as they come in, rather than planning for events that it thinks should occur.

On Thu, Mar 22, 2018 at 4:58 PM, Russ Bubley [email protected] wrote:

It occurs to me that if the scheduler was given knowledge of the target number of instances (e.g. through cluster.scale_up telling it), then it could allocate tasks to putative instances before they are created; then when they join they the details of the assigned tasks could be given to them. Of course, in the meantime their tasks should remain stealable.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1851#issuecomment-375455285, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszByslUuDu_zFTSJ2bZ5SqvyrQlciks5thBCFgaJpZM4S3au1 .

mrocklin avatar Mar 22 '18 21:03 mrocklin

Sorry if I sound like I'm shooting down lots of ideas. Generally speaking we always try to do the best we can with the information we have at any given point, and then react incrementally as new information comes to light. A lot of the scheduler is based on this (including most of our validation checks) which is why I'm probably more resistant to some of these things than one might expect.

I do genuinely think that improving load balancing / work stealing is probably the right move here. It will work even if things go awry when allocating more workers.

mrocklin avatar Mar 22 '18 21:03 mrocklin

This needs to work even if people are giving us resources arbitrarily on their own.

This is exactly the use case I have. Let's say the work load I have has 15 tasks that require a space laser, because space lasers are expensive to run I only want to run the machine(s) when the tasks that require those resources are actually ready to be executed.

The current logic will allow the "space laser" work to be stolen from the first worker that connects iff the tasks are quick to run. If the tasks take longer than the scheduler IOLoop, the scheduler will kill the n-1 new "space laser" workers, because it sees them idling before anyone gets a chance to even look at what could be stolen. The task stealing will only potentially happen once the first "space laser" task has finished and the scheduler gets information about the occupancy of those tasks. If the tasks take a long time to run, say an hour, this is huge waste of resources. If the tasks are quick to run, then the existing logic more or less works.

I can think of a number of solutions - "long running" below is anything that takes longer than one tick of the scheduler IOLoop:

  1. Allow the scheduler to preemptively schedule tasks on workers that have been launched but haven't connected yet. Perhaps the Cluster instance could call scheduler.add_worker once the new workers are launched. The WorkerState.info could have a variable telling that the worker is currently spinning up and will be available in X seconds, X would need to be adjusted by users depending on what actual hardware the cluster is deployed on. It would make sense to have X be adjustable per worker/host as different machines may require different boot up times. If the worker does not connect within a timeout period then the tasks are moved back into unrunnable. This partially violates the act in the now principle @mrocklin pointed out above, depending on what exactly constitutes now - you could argue that now includes the information about workers that have been launched but haven't connected yet.

  2. Explicitly check for stealable tasks (if the extension exists) in retire_workers or add_worker or both. This won't help the long running tasks on its own, as the scheduler, or more specifically stealing will have stale estimates of task occupancy. The task occupancy is communicated back to the scheduler only when the task completes, not live while the task is running.

    2.1. The worker <-> scheduler comms should therefore also be changed so that task durations are communicated back to the scheduler while the work is being done. This should probably only happen in cases where the tasks are long running, i.e. the worker should send task duration updates every 5-10 seconds. - Could this info be added to the heartbeat? There would likely be negative consequences for the constant time guarantees in handling the heartbeat on the scheduler.

  3. Allow tasks to be marked as always stealable. Add an optional flag to TaskState that allows users to mark tasks as stealable regardless of what the logic in stealing is. This may result in tasks constantly being moved back and forth between workers unless the user takes very special care of matching resources with tasks. Might complicate using distributed to such an extent that it actually invalidates the point of having the flag.

  4. Change the semantics of loose_restrictions slightly so that when loose_restrictions=False (restrictions are strict) the scheduler will not assign tasks to workers if sufficient resources are not available. This would require a minimal change to scheduler.valid_workers to do virtual accounting of resources consumed for the potential task assignments, and actual accounting in add_resources and remove_resources.

mattilyra avatar Mar 23 '18 09:03 mattilyra

Here are two additional possible solutions that I see:

  1. Maintain durable information about task duration somewhere so that schedulers can share information between instantiations. That way we're only wasteful the first time we encounter a new function type, but efficient on subsequent computations at the same institution. This is potentially troublesome because we'll have to have some shared durable resource, which may not be easily accessible on all systems. I can imagine it being quite easy on systems with a shared file system (like most HPC systems) but difficult on more ephemeral systems that tear things down every time.

  2. When we place a task into unknown_durations we also set up a call back to check in after a couple seconds. That callback sees if the task is still in unknown durations and, if so defines a known duration like 5s and kicks off work stealing for that task type if applicable. This seems easy to implement, relatively innocuous, and pretty fool-proof.

mrocklin avatar Mar 23 '18 12:03 mrocklin

One option would be to add a prefetch arg to the scheduler to limit the number of tasks that can be pushed onto workers preemptively. Setting it to one would basically change the scheduler to a just be a job queue, this makes sense for workloads where most tasks are costly, for instance large ML training pipelines where the algorithm training time dominates everything else.

I don't know if that's a workload dask should accommodate for?

On 23 Mar 2018 13:01, "Matthew Rocklin" [email protected] wrote:

Here are two additional possible solutions that I see:

Maintain durable information about task duration somewhere so that schedulers can share information between instantiations. That way we're only wasteful the first time we encounter a new function type, but efficient on subsequent computations at the same institution. This is potentially troublesome because we'll have to have some shared durable resource, which may not be easily accessible on all systems. I can imagine it being quite easy on systems with a shared file system (like most HPC systems) but difficult on more ephemeral systems that tear things down every time. 2.

When we place a task into unknown_durations we also set up a call back to check in after a couple seconds. That callback sees if the task is still in unknown durations and, if so defines a known duration like 5s and kicks off work stealing for that task type if applicable. This seems easy to implement, relatively innocuous, and pretty fool-proof.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1851#issuecomment-375644086, or mute the thread https://github.com/notifications/unsubscribe-auth/ABAb5z_rtpCtQQQzTbMjaoRHVa5hKQs6ks5thOQsgaJpZM4S3au1 .

mattilyra avatar Mar 23 '18 12:03 mattilyra

Are there reasons why load balancing / work stealing couldn't be made to handle these workloads as well? People certainly use Dask as-is today for these sorts of things. The issue you bring up about the scheduler needing some information before work stealing can kick in is a real one, but I believe surmountable.

Adding queuing systems onto the scheduler would require non-trivial amounts of work. Someone would both need to construct this logic and ensure that it played nicely with all other parts of Dask (resources, resilience, long-running-tasks, tasks that create tasks, etc..)

mrocklin avatar Mar 23 '18 12:03 mrocklin

Or, more specifically, after maintaining this for a while I'm inclined to introduce as few additional systems as possible. If we're able to solve this problem by adding a callback that adds an estimate of time when tasks have unknown durations then I suggest that we do that. I don't anticipate that it will have any complications with other systems and will require almost no additional maintenance.

mrocklin avatar Mar 23 '18 12:03 mrocklin

Any update on this @mattilyra ?

mrocklin avatar Mar 30 '18 15:03 mrocklin

I've been testing the above branch and seems to be stable. I still need to write tests before a PR.

Also if you have some example workloads you'd like me to test let me know.


At the moment the callback to check_task_duration is done in transition_waiting_processing as that was the only place I could reliably detect when the task start to be in the processing state and then add the callback once and only once per task prefix.

mattilyra avatar Mar 30 '18 15:03 mattilyra

I'm not sure that I have the same issue but I thought I'll post here before opening a new issue.

Our situation is that we are adding new workers but they are not assigned any tasks.

Every task runs about 60 seconds and is completely independent from the others. So I would assume that they should be worth stealing.

The info panel shows the following:

image

Is there anything to be done to improve this situation?

cpaulik avatar Jun 14 '18 07:06 cpaulik

I think I figured out my problem after taking a look at the code. I was using dask_key_name on my delayed functions. After removing that the occupancy numbers and the task distribution are much more reasonable

cpaulik avatar Jun 14 '18 08:06 cpaulik

I find my self in a situation that I believe requires something along these lines, but I can open a new issue if desired.

My situation: I am running a pipeline, using a task-graph assembled with delayed objects, that has some tasks that use openmp threading. To enforce acceptable behavior I have defined an arbitrary resource on the workers to prevent too many tasks executing on any given worker. I am using dask-jobqueue so it is a homogenous pool of workers for now. I am seeing the same behavior mentioned above i.e. if the cluster job scheduler starts a couple of workers before the others they are given all the work and no load-balancing occurs when the others come online. This may be aggravated by the fact that some of the earlier tasks in the pipeline can take more than an hour, so any computation of the communication ratio won't help me.

I am wondering about what the best way to implement a solution to this is. I would be happy with a basic redistribution of work ever minute (or 10) if something more comprehensive is not feasible. Would the resource definition on the workers prevent this being implemented easily? I see that the scheduler's behavior can be modified by a plugin class... would this be the place to start? Or would the above mentioned callback be the place to attack this problem? Is it testing that is preventing this current issue moving forward or a desire for a more comprehensive solution?

Any help appreciated. Thanks.

leej3 avatar Jan 31 '19 14:01 leej3

Hm, from what I understand here, adaptive seems to be broken when using resources option in dask-worker?

@leej3 could you provide in minimal reproducible example?

guillaumeeb avatar Feb 04 '19 21:02 guillaumeeb

Hi @guillaumeeb. Thanks for the help.

I believe the resources to be set appropriately using dask jobqueue following the "Providing additional arguments to the dask-workers" section of the docs. To start, here is a reproducible example with just LocalCluster though.

from dask.distributed import Client, LocalCluster
from dask import delayed
import time
@delayed
def do_work(task_time=0.5):
    import os
    import time
    import threading
    time.sleep(task_time)
    return (os.getpid() , threading.current_thread().ident)

def run_staggered_workers(worker_kwargs,task_time=10,stagger = True):
    """
    Run 10 tasks. With stagger set to true, the worker pool is scaled up after
    the task graph is sent to the scheduler for execution.
    """
    n_workers = worker_kwargs.pop('n_workers')
    if stagger:
        cluster = LocalCluster(**worker_kwargs, n_workers = 1)
    else:
        cluster = LocalCluster(**worker_kwargs, n_workers = n_workers)
    client = Client(cluster)


    #Pause after initial worker startup.
    time.sleep(1)
    
    # Run the tasks
    tasks = [do_work(task_time=task_time) for x in range(10)]
    result = client.compute(tasks, resources = worker_kwargs['resources'])
    
    if stagger:
        # Scale up worker pool
        cluster.scale_up(n_workers)

    output = client.gather(result)
    client.close()
    return output
    

With no resources specified

With task time of 2 seconds, and n_workers = n_tasks I expect the job to finish after 2 or 3 task times because a communication ratio must be computed by the scheduler. This appears to be true (with some extra time for overhead):

start = time.time()
worker_kwargs = {'processes' : True,
                 'threads_per_worker' : 1,
                 'n_workers' : 10,
                 'resources' : None}
output = run_staggered_workers(worker_kwargs, task_time = 1)
end = time.time()
print(end - start, "seconds")
4.981343030929565 seconds
# The process and thread ids used demonstrate that multiple workers executed the tasks:
output
[(96575, 123145573806080),
 (96578, 123145482354688),
 (96387, 123145362042880),
 (96579, 123145490063360),
 (96581, 123145560764416),
 (96573, 123145329250304),
 (96572, 123145478262784),
 (96576, 123145584574464),
 (96580, 123145451167744),
 (96387, 123145362042880)]

With resources specified

When resources are specified for the jobs, job redistribution does not occur:

start = time.time()
worker_kwargs = {'processes' : True,
                 'threads_per_worker' : 1,
                 'n_workers' : 10,
                 'resources' : {'foo' : 1}}
output = run_staggered_workers(worker_kwargs, task_time = 1)
end = time.time()
print(end - start, "seconds")
11.634655237197876 seconds

The process and thread ids used demonstrate that only one worker executed all the tasks:

output
[(96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016),
 (96796, 123145410646016)]

leej3 avatar Feb 05 '19 11:02 leej3

Thanks a lot for the stand-alone snippet @leej3! I can reproduce the behaviour.

lesteve avatar Feb 06 '19 10:02 lesteve

Reading @leej3's post more closely, it is not using .adapt so maybe this is a slightly different problem ...

lesteve avatar Feb 06 '19 11:02 lesteve

sorry I missed the .scale in @leej3's post, so I guess this is the same problem as .adapt. Here is a slightly simpler snippet to reproduce the problem:

import time
import os
import threading
import pprint
import webbrowser

from dask.distributed import Client, LocalCluster


def do_work(task_time=0.5):
    time.sleep(task_time)
    return (os.getpid(), threading.current_thread().ident)


cluster = LocalCluster(n_workers=1, threads_per_worker=3, resources={'foo': 1})
client = Client(cluster)
dashboard_port = client.scheduler_info()['services']['bokeh']
print('dashboard:', dashboard_port)
# uncomment next two lines if you want to open the dashboard. sleep 5s to give
# time to the tab to load
# webbrowser.open(f'http://localhost:{dashboard_port}/status')
# time.sleep(5)
t0 = time.time()
futures = [client.submit(do_work, pure=False, resources={'foo': 1})
           for i in range(20)]
cluster.scale(2)
output = client.gather(futures)
pprint.pprint(output)
print(time.time() - t0)

Output:

[(11660, 139795962357504),
 (11660, 139795962357504),
 (11660, 139795481229056),
 (11660, 139795962357504),
 (11660, 139795481229056),
 (11660, 139795472836352),
 (11660, 139795481229056),
 (11660, 139795962357504),
 (11660, 139795481229056),
 (11660, 139795472836352),
 (11660, 139795481229056),
 (11660, 139795962357504),
 (11660, 139795481229056),
 (11660, 139795472836352),
 (11660, 139795481229056),
 (11660, 139795962357504),
 (11660, 139795481229056),
 (11660, 139795472836352),
 (11660, 139795481229056),
 (11660, 139795962357504)]
10.067777633666992

Summary of the issue:

  • submit some tasks using resources
  • scale up your cluster
  • workers that are added after the task submissions are never used to do some tasks (bug only exists when using resources)

lesteve avatar Feb 06 '19 14:02 lesteve

Thanks. A nice minimization too. Based on previous comments of this issue it seems like it will be tricky to modify the scheduler to work in a desirable manner for this case without adding unacceptable overhead to the default case where resources are not specified. At least we now have a nice and simple test for it though.

leej3 avatar Feb 06 '19 15:02 leej3

I guess the first thing to do would be to look at @mattilyra's branch https://github.com/mattilyra/distributed/commits/resource_accounting_for_valid_workers.

This branch does not work with recent dask because dask.set_option has been removed. I tried fixing this but got another error:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/protocol/core.py", line 31, in dumps
    small_header, small_payload = dumps_msgpack(msg)
  File "/home/local/lesteve/dev/distributed/distributed/protocol/core.py", line 142, in dumps_msgpack
    fmt, payload = maybe_compress(payload)
  File "/home/local/lesteve/dev/distributed/distributed/protocol/compression.py", line 155, in maybe_compress
    compress = compressions[compression]['compress']
KeyError: 'auto'

I ended up rebasing @mattilyra's branch on top of master and I got errors there too (see below), so this need to be looked at in more details. Of course @mattilyra if you have some spare bandwidth it would be great if you could have a closer look since it feels like you weren't that far to have something working:

distributed.scheduler - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.core - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
    handler(**merge(extra, msg))
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.scheduler - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
    yield self.handle_stream(comm=comm, extra={'client': client})
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
    handler(**merge(extra, msg))
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
    'key': key})
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
    self.release_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
    self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'processing' to 'released'
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
    yield self.handle_stream(comm=comm, extra={'client': client})
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
    handler(**merge(extra, msg))
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
    'key': key})
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
    self.release_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
    self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'processing' to 'forgotten'
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
    yield self.handle_stream(comm=comm, extra={'client': client})
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
    handler(**merge(extra, msg))
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3936, in transition
    a = self.transition(key, 'released')
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
    'key': key})
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
    self.release_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
    self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.core - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
    yield self.handle_stream(comm=comm, extra={'client': client})
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
    handler(**merge(extra, msg))
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
    self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/local/lesteve/dev/distributed/distributed/core.py", line 346, in handle_comm
    result = yield result
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2043, in add_client
    self.remove_client(client=client)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2070, in remove_client
    client=cs.client_key)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1856, in client_releases_keys
    self.transitions(recommendations)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
    new = self.transition(key, finish)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3936, in transition
    a = self.transition(key, 'released')
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
    recommendations = func(key, *args, **kwargs)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
    'key': key})
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
    self.release_resources(ts, ws)
  File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
    self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/home/local/lesteve/dev/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1485                         try:
-> 1486                             st = self.futures[key]
   1487                             exception = st.exception

KeyError: 'do_work-9d266e2f-1e97-465c-9ba4-d6374886950f'

During handling of the above exception, another exception occurred:

CancelledError                            Traceback (most recent call last)
<ipython-input-1-a86c453096d1> in <module>()
     25            for i in range(20)]
     26 cluster.scale(2)
---> 27 output = client.gather(futures)
     28 pprint.pprint(output)
     29 print(time.time() - t0)

/home/local/lesteve/dev/distributed/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1653             return self.sync(self._gather, futures, errors=errors,
   1654                              direct=direct, local_worker=local_worker,
-> 1655                              asynchronous=asynchronous)
   1656 
   1657     @gen.coroutine

/home/local/lesteve/dev/distributed/distributed/client.py in sync(self, func, *args, **kwargs)
    673             return future
    674         else:
--> 675             return sync(self.loop, func, *args, **kwargs)
    676 
    677     def __repr__(self):

/home/local/lesteve/dev/distributed/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/home/local/lesteve/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/home/local/lesteve/dev/distributed/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/home/local/lesteve/dev/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1490                             six.reraise(CancelledError,
   1491                                         CancelledError(key),
-> 1492                                         None)
   1493                         else:
   1494                             six.reraise(type(exception),

/home/local/lesteve/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

CancelledError: do_work-9d266e2f-1e97-465c-9ba4-d6374886950f

lesteve avatar Feb 06 '19 15:02 lesteve

I'm seeing similar behavior with my workload where all tasks are going to a single worker because other workers haven't come online yet. This thread is more than a year old, so I'm not sure what's changed since the beginning of this thread. Anyways, here are my findings w.r.t. worker stealing not working properly with resources.

From what I can tell, stealing logic is implemented in stealing.py with a SchedulerPlugin. Every time a worker is added, state is updated to keep track of what tasks can be stolen from it. In particular, this is the stealable instance attribute. Every time a task transitions, this plugin checks to see whether the task is transitioning to "processing" and calls put_key_in_stealable passing in the task state.

Inside of put_key_in_stealable, the cost of moving the task is computed using steal_time_ratio. The first check in steal_time_ratio is checking whether the task has hard restrictions and whether any of the restrictions are set:

if not ts.loose_restrictions and (
    ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
):
    return None, None  # don't steal

Back in put_key_in_stealable, nothing happens if the returned cost is None:

def put_key_in_stealable(self, ts):
    ws = ts.processing_on
    worker = ws.address
    cost_multiplier, level = self.steal_time_ratio(ts)
    self.log.append(("add-stealable", ts.key, worker, level))
    if cost_multiplier is not None:
        self.stealable_all[level].add(ts)
        self.stealable[worker][level].add(ts)
        self.key_stealable[ts] = (worker, level)

This would seem to explain why no work stealing occurs when tasks are marked with resources. Any comments on the analysis here?

Moving on to a possible fix, I think it makes sense to remove the restrictions check mentioned above and add some checks before the call to maybe_move_task in balance to ensure that the thief has the resources required to steal the task. Thoughts?

calebho avatar May 29 '19 23:05 calebho

Any comments on the analysis here?

That all tracks.

add some checks before the call to maybe_move_task in balance to ensure that the thief has the resources required to steal the task.

That also makes sense, but the resource restriction on workers seems to complicate things quite a bit. Though, perhaps not... perhaps we can just continue to treat things normally, and only check the resource restriction at the last moment? That's won't be ideal, since we won't prioritize workers with the resource restriction as opportunities to steal, but it shouldn't be any worse...

TomAugspurger avatar May 31 '19 03:05 TomAugspurger