distributed
distributed copied to clipboard
Scheduler behaves badly when adaptively adding workers to meet resource demand
There is an issue with how the scheduler assigns tasks from the unrannable
queue to workers who meet the resource requirements joining the scheduler.
The use case is some long running complex task where some tasks require an expensive resource, say GPUs, but those resources are only provided (through Adaptive
) once the tasks requiring those resources are ready to be run. Say we come to a point in the computation where 5 tasks could be run, if GPUs were available. Managing the scale_up
behaviour through adaptive is fairly straightforward and allows adding new compute nodes (for instance on AWS) with the required resources. The problem appears when the first of the new workers connects.
Scheduler.addWorker
will go through the list of unrunnable
tasks and check if there are workers that meet the requirements, since only the first worker has thus far connected there is only one worker that meets the requirements (some possibly positive number of workers may or may not booting up and joining shortly, but that hasn't happened yet).
for ts in list(self.unrunnable):
valid = self.valid_workers(ts)
if valid is True or ws in valid:
recommendations[ts.key] = 'waiting'
The task goes through released -> waiting
and then waiting -> processing
, the transition_waiting_processing
again calls valid_workers
to get a list of workers where the task(s) can be run (this list still just contains a single worker because the other ones haven't yet connected).
The end result of all of this is that the worker who happens connect first and have the resource required by the tasks gets all the tasks dumped onto it with all the other workers, who potentially connect just seconds later, get nothing and are shutdown by the scheduler because they are idling.
In short, it appears to be the case that the purpose of the resource_requirements
is to act as a hint of required peak performance (memory, GPU, whatever) from the workers, and not to be a dynamically changing resource allocation. Is this the case, and is there any interested in changing that? The resources available and resources consumed are taken into account in transition_waiting_processing
, but only on the worker_state
not for the scheduler in general.
If this is not the intended behaviour and should be fixed, I'm more than happy to work on this.
If this is not the intended behaviour and should be fixed, I'm more than happy to work on this.
This is not the intended behaviour and should be fixed. Help would be welcome!
I agree with your understanding of the situation and in general I'm glad to hear from someone who is comfortable working within the scheduler code.
My first instinct on how to solve this problem would be to look at improving work stealing with resources. The first worker should get all of the tasks if its the only one around, but it should also share them once others arrive.
Doing this well is likely to require some thought. The current work stealing system is probably not as clean as it could be. We'll also want to ensure that this doesn't cost scheduler time in the common case when resources are not in use.
That was my first thought as well, so I merged the branch by quakberto (see #1389) with master and tested it.
The problem with relying on work stealing to sort this out is two fold:
-
The scheduler keeps scaling up and down which keeps the scheduler CPU busy and the work stealing code is rarely, if ever, executed. This is partially related to the custom
should_scale_up
code I have, which tries to account for resource starvation not only forunrunnable
tasks but more generally in the hope that tasks would be moved (stolen) if more machines with the resources were available. The scheduler ends up killing then-1
new workers because they are idling. I haven't totally cracked why exactly the tasks aren't getting stolen, this is also going to be highly dependent on the actual workload and seems somewhat brittle especially if adaptive is meant to allow people to run highly customised loads. -
The work stealing only happens once the scheduler gets some info back from the workers regarding the running times (or
occupancy
) of the tasks. For tasks that take some time to run, say training ML models, this lag will be long enough for the scheduler to again get rid of the workers before they get a chance to steal anything. The changes fromquakberto
are still required but they're not enough on their own.
My suggestion would be to instead change Scheduler.valid_workers
to do virtual accounting of resources required/supplied. Virtual because the resources haven't actually been taken yet, there is just a proposal to do so in the future. Additionally Scheduler.consume_resources
and Scheduler.release_resources
would need to modified to account for the resources available in the cluster (self.resources
).
I've started a branch (like 5 seconds ago) with these changes and am currently testing it (https://github.com/mattilyra/distributed/tree/resource_accounting_for_valid_workers).
The scheduler keeps scaling up and down which keeps the scheduler CPU busy and the work stealing code is rarely, if ever, executed
This is a problem all on its own. The scale_up and scale_down code should either be very fast, or we should execute them in a separate thread. Blocking the scheduler is generally not recommended.
The work stealing only happens once the scheduler gets some info back from the workers regarding the running times
I'm inclined to handle this separately. Maybe we make it easy for schedulers to persist this information between sessions.
In general I think it would be better to try to fix this to make the scheduler be aware of the resource constraints in the first place and act fairly, or at least as fairly as it can instead of dumping all work on one node and then relying others to maybe steal it in the future.
Keeping tasks in the unrunnable
state while no resources are available to be actively used seems like a reasonable thing to do.
The scheduler used to operate like this, keeping tasks that could be run but weren't in the scheduler until some worker was available that had resources to run them soon. We switched away from this in an effort to improve performance. It turns out to be useful to give the workers plenty to do so that they can start communicating for dependencies earlier, have a backlog of tasks to push in if other tasks take a while to get to it etc.. This was generally a success and so we've thrown out all of the old machinery to keep tasks in the scheduler. The unrunnable
collection is a very small version of this that's still around. I don't think that we should rely on it in this way. For example, you might have to check on all of unrunnable every time a task completed rather than every time a new worker was added. This might affect our constant-time guarantees.
For example, you might have to check on all of unrunnable every time a task completed rather than every time a new worker was added. This might affect our constant-time guarantees.
Good point.
Is there anything in the scheduler that keeps track of workers launched but not yet connected? That could perhaps be used as an indicator to hold back on pushing all the work onto the first worker that connects.
I agree that this would help with planning but no, the scheduler has no such information. Actually the scheduler has no knowledge of how to launch workers. This tends to be handled by a Cluster object.
It occurs to me that if the scheduler was given knowledge of the target number of instances (e.g. through cluster.scale_up
telling it), then it could allocate tasks to putative instances before they are created; then when they join they the details of the assigned tasks could be given to them. Of course, in the meantime their tasks should remain stealable.
This needs to work even if people are giving us resources arbitrarily on their own. Generaly Dask survives difficult situations by reacting intelligently to events as they come in, rather than planning for events that it thinks should occur.
On Thu, Mar 22, 2018 at 4:58 PM, Russ Bubley [email protected] wrote:
It occurs to me that if the scheduler was given knowledge of the target number of instances (e.g. through cluster.scale_up telling it), then it could allocate tasks to putative instances before they are created; then when they join they the details of the assigned tasks could be given to them. Of course, in the meantime their tasks should remain stealable.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1851#issuecomment-375455285, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszByslUuDu_zFTSJ2bZ5SqvyrQlciks5thBCFgaJpZM4S3au1 .
Sorry if I sound like I'm shooting down lots of ideas. Generally speaking we always try to do the best we can with the information we have at any given point, and then react incrementally as new information comes to light. A lot of the scheduler is based on this (including most of our validation checks) which is why I'm probably more resistant to some of these things than one might expect.
I do genuinely think that improving load balancing / work stealing is probably the right move here. It will work even if things go awry when allocating more workers.
This needs to work even if people are giving us resources arbitrarily on their own.
This is exactly the use case I have. Let's say the work load I have has 15 tasks that require a space laser, because space lasers are expensive to run I only want to run the machine(s) when the tasks that require those resources are actually ready to be executed.
The current logic will allow the "space laser" work to be stolen from the first worker that connects iff the tasks are quick to run. If the tasks take longer than the scheduler IOLoop, the scheduler will kill the n-1
new "space laser" workers, because it sees them idling before anyone gets a chance to even look at what could be stolen. The task stealing will only potentially happen once the first "space laser" task has finished and the scheduler gets information about the occupancy
of those tasks. If the tasks take a long time to run, say an hour, this is huge waste of resources. If the tasks are quick to run, then the existing logic more or less works.
I can think of a number of solutions - "long running" below is anything that takes longer than one tick of the scheduler IOLoop:
-
Allow the scheduler to preemptively schedule tasks on workers that have been launched but haven't connected yet. Perhaps the
Cluster
instance could callscheduler.add_worker
once the new workers are launched. TheWorkerState.info
could have a variable telling that the worker is currently spinning up and will be available in X seconds, X would need to be adjusted by users depending on what actual hardware the cluster is deployed on. It would make sense to have X be adjustable per worker/host as different machines may require different boot up times. If the worker does not connect within a timeout period then the tasks are moved back into unrunnable. This partially violates the act in the now principle @mrocklin pointed out above, depending on what exactly constitutes now - you could argue that now includes the information about workers that have been launched but haven't connected yet. -
Explicitly check for stealable tasks (if the extension exists) in
retire_workers
oradd_worker
or both. This won't help the long running tasks on its own, as the scheduler, or more specificallystealing
will have stale estimates of task occupancy. The task occupancy is communicated back to the scheduler only when the task completes, not live while the task is running.2.1. The worker <-> scheduler comms should therefore also be changed so that task durations are communicated back to the scheduler while the work is being done. This should probably only happen in cases where the tasks are long running, i.e. the worker should send task duration updates every 5-10 seconds. - Could this info be added to the heartbeat? There would likely be negative consequences for the constant time guarantees in handling the heartbeat on the scheduler.
-
Allow tasks to be marked as always stealable. Add an optional flag to
TaskState
that allows users to mark tasks as stealable regardless of what the logic instealing
is. This may result in tasks constantly being moved back and forth between workers unless the user takes very special care of matching resources with tasks. Might complicate usingdistributed
to such an extent that it actually invalidates the point of having the flag. -
Change the semantics of
loose_restrictions
slightly so that whenloose_restrictions=False
(restrictions are strict) the scheduler will not assign tasks to workers if sufficient resources are not available. This would require a minimal change toscheduler.valid_workers
to do virtual accounting of resources consumed for the potential task assignments, and actual accounting inadd_resources
andremove_resources
.
Here are two additional possible solutions that I see:
-
Maintain durable information about task duration somewhere so that schedulers can share information between instantiations. That way we're only wasteful the first time we encounter a new function type, but efficient on subsequent computations at the same institution. This is potentially troublesome because we'll have to have some shared durable resource, which may not be easily accessible on all systems. I can imagine it being quite easy on systems with a shared file system (like most HPC systems) but difficult on more ephemeral systems that tear things down every time.
-
When we place a task into
unknown_durations
we also set up a call back to check in after a couple seconds. That callback sees if the task is still in unknown durations and, if so defines a known duration like 5s and kicks off work stealing for that task type if applicable. This seems easy to implement, relatively innocuous, and pretty fool-proof.
One option would be to add a prefetch
arg to the scheduler to limit the
number of tasks that can be pushed onto workers preemptively. Setting it to
one would basically change the scheduler to a just be a job queue, this
makes sense for workloads where most tasks are costly, for instance large
ML training pipelines where the algorithm training time dominates
everything else.
I don't know if that's a workload dask should accommodate for?
On 23 Mar 2018 13:01, "Matthew Rocklin" [email protected] wrote:
Here are two additional possible solutions that I see:
Maintain durable information about task duration somewhere so that schedulers can share information between instantiations. That way we're only wasteful the first time we encounter a new function type, but efficient on subsequent computations at the same institution. This is potentially troublesome because we'll have to have some shared durable resource, which may not be easily accessible on all systems. I can imagine it being quite easy on systems with a shared file system (like most HPC systems) but difficult on more ephemeral systems that tear things down every time. 2.
When we place a task into unknown_durations we also set up a call back to check in after a couple seconds. That callback sees if the task is still in unknown durations and, if so defines a known duration like 5s and kicks off work stealing for that task type if applicable. This seems easy to implement, relatively innocuous, and pretty fool-proof.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1851#issuecomment-375644086, or mute the thread https://github.com/notifications/unsubscribe-auth/ABAb5z_rtpCtQQQzTbMjaoRHVa5hKQs6ks5thOQsgaJpZM4S3au1 .
Are there reasons why load balancing / work stealing couldn't be made to handle these workloads as well? People certainly use Dask as-is today for these sorts of things. The issue you bring up about the scheduler needing some information before work stealing can kick in is a real one, but I believe surmountable.
Adding queuing systems onto the scheduler would require non-trivial amounts of work. Someone would both need to construct this logic and ensure that it played nicely with all other parts of Dask (resources, resilience, long-running-tasks, tasks that create tasks, etc..)
Or, more specifically, after maintaining this for a while I'm inclined to introduce as few additional systems as possible. If we're able to solve this problem by adding a callback that adds an estimate of time when tasks have unknown durations then I suggest that we do that. I don't anticipate that it will have any complications with other systems and will require almost no additional maintenance.
Any update on this @mattilyra ?
I've been testing the above branch and seems to be stable. I still need to write tests before a PR.
Also if you have some example workloads you'd like me to test let me know.
At the moment the callback to check_task_duration
is done in transition_waiting_processing
as that was the only place I could reliably detect when the task start to be in the processing state and then add the callback once and only once per task prefix.
I'm not sure that I have the same issue but I thought I'll post here before opening a new issue.
Our situation is that we are adding new workers but they are not assigned any tasks.
Every task runs about 60 seconds and is completely independent from the others. So I would assume that they should be worth stealing.
The info panel shows the following:
Is there anything to be done to improve this situation?
I think I figured out my problem after taking a look at the code. I was using dask_key_name
on my delayed functions. After removing that the occupancy numbers and the task distribution are much more reasonable
I find my self in a situation that I believe requires something along these lines, but I can open a new issue if desired.
My situation: I am running a pipeline, using a task-graph assembled with delayed objects, that has some tasks that use openmp threading. To enforce acceptable behavior I have defined an arbitrary resource on the workers to prevent too many tasks executing on any given worker. I am using dask-jobqueue so it is a homogenous pool of workers for now. I am seeing the same behavior mentioned above i.e. if the cluster job scheduler starts a couple of workers before the others they are given all the work and no load-balancing occurs when the others come online. This may be aggravated by the fact that some of the earlier tasks in the pipeline can take more than an hour, so any computation of the communication ratio won't help me.
I am wondering about what the best way to implement a solution to this is. I would be happy with a basic redistribution of work ever minute (or 10) if something more comprehensive is not feasible. Would the resource definition on the workers prevent this being implemented easily? I see that the scheduler's behavior can be modified by a plugin class... would this be the place to start? Or would the above mentioned callback be the place to attack this problem? Is it testing that is preventing this current issue moving forward or a desire for a more comprehensive solution?
Any help appreciated. Thanks.
Hm, from what I understand here, adaptive seems to be broken when using resources
option in dask-worker?
@leej3 could you provide in minimal reproducible example?
Hi @guillaumeeb. Thanks for the help.
I believe the resources to be set appropriately using dask jobqueue following the "Providing additional arguments to the dask-workers" section of the docs. To start, here is a reproducible example with just LocalCluster though.
from dask.distributed import Client, LocalCluster
from dask import delayed
import time
@delayed
def do_work(task_time=0.5):
import os
import time
import threading
time.sleep(task_time)
return (os.getpid() , threading.current_thread().ident)
def run_staggered_workers(worker_kwargs,task_time=10,stagger = True):
"""
Run 10 tasks. With stagger set to true, the worker pool is scaled up after
the task graph is sent to the scheduler for execution.
"""
n_workers = worker_kwargs.pop('n_workers')
if stagger:
cluster = LocalCluster(**worker_kwargs, n_workers = 1)
else:
cluster = LocalCluster(**worker_kwargs, n_workers = n_workers)
client = Client(cluster)
#Pause after initial worker startup.
time.sleep(1)
# Run the tasks
tasks = [do_work(task_time=task_time) for x in range(10)]
result = client.compute(tasks, resources = worker_kwargs['resources'])
if stagger:
# Scale up worker pool
cluster.scale_up(n_workers)
output = client.gather(result)
client.close()
return output
With no resources specified
With task time of 2 seconds, and n_workers = n_tasks I expect the job to finish after 2 or 3 task times because a communication ratio must be computed by the scheduler. This appears to be true (with some extra time for overhead):
start = time.time()
worker_kwargs = {'processes' : True,
'threads_per_worker' : 1,
'n_workers' : 10,
'resources' : None}
output = run_staggered_workers(worker_kwargs, task_time = 1)
end = time.time()
print(end - start, "seconds")
4.981343030929565 seconds
# The process and thread ids used demonstrate that multiple workers executed the tasks:
output
[(96575, 123145573806080),
(96578, 123145482354688),
(96387, 123145362042880),
(96579, 123145490063360),
(96581, 123145560764416),
(96573, 123145329250304),
(96572, 123145478262784),
(96576, 123145584574464),
(96580, 123145451167744),
(96387, 123145362042880)]
With resources specified
When resources are specified for the jobs, job redistribution does not occur:
start = time.time()
worker_kwargs = {'processes' : True,
'threads_per_worker' : 1,
'n_workers' : 10,
'resources' : {'foo' : 1}}
output = run_staggered_workers(worker_kwargs, task_time = 1)
end = time.time()
print(end - start, "seconds")
11.634655237197876 seconds
The process and thread ids used demonstrate that only one worker executed all the tasks:
output
[(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016),
(96796, 123145410646016)]
Thanks a lot for the stand-alone snippet @leej3! I can reproduce the behaviour.
Reading @leej3's post more closely, it is not using .adapt
so maybe this is a slightly different problem ...
sorry I missed the .scale
in @leej3's post, so I guess this is the same problem as .adapt
.
Here is a slightly simpler snippet to reproduce the problem:
import time
import os
import threading
import pprint
import webbrowser
from dask.distributed import Client, LocalCluster
def do_work(task_time=0.5):
time.sleep(task_time)
return (os.getpid(), threading.current_thread().ident)
cluster = LocalCluster(n_workers=1, threads_per_worker=3, resources={'foo': 1})
client = Client(cluster)
dashboard_port = client.scheduler_info()['services']['bokeh']
print('dashboard:', dashboard_port)
# uncomment next two lines if you want to open the dashboard. sleep 5s to give
# time to the tab to load
# webbrowser.open(f'http://localhost:{dashboard_port}/status')
# time.sleep(5)
t0 = time.time()
futures = [client.submit(do_work, pure=False, resources={'foo': 1})
for i in range(20)]
cluster.scale(2)
output = client.gather(futures)
pprint.pprint(output)
print(time.time() - t0)
Output:
[(11660, 139795962357504),
(11660, 139795962357504),
(11660, 139795481229056),
(11660, 139795962357504),
(11660, 139795481229056),
(11660, 139795472836352),
(11660, 139795481229056),
(11660, 139795962357504),
(11660, 139795481229056),
(11660, 139795472836352),
(11660, 139795481229056),
(11660, 139795962357504),
(11660, 139795481229056),
(11660, 139795472836352),
(11660, 139795481229056),
(11660, 139795962357504),
(11660, 139795481229056),
(11660, 139795472836352),
(11660, 139795481229056),
(11660, 139795962357504)]
10.067777633666992
Summary of the issue:
- submit some tasks using resources
- scale up your cluster
- workers that are added after the task submissions are never used to do some tasks (bug only exists when using resources)
Thanks. A nice minimization too. Based on previous comments of this issue it seems like it will be tricky to modify the scheduler to work in a desirable manner for this case without adding unacceptable overhead to the default case where resources are not specified. At least we now have a nice and simple test for it though.
I guess the first thing to do would be to look at @mattilyra's branch https://github.com/mattilyra/distributed/commits/resource_accounting_for_valid_workers.
This branch does not work with recent dask
because dask.set_option
has been removed. I tried fixing this but got another error:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/protocol/core.py", line 31, in dumps
small_header, small_payload = dumps_msgpack(msg)
File "/home/local/lesteve/dev/distributed/distributed/protocol/core.py", line 142, in dumps_msgpack
fmt, payload = maybe_compress(payload)
File "/home/local/lesteve/dev/distributed/distributed/protocol/compression.py", line 155, in maybe_compress
compress = compressions[compression]['compress']
KeyError: 'auto'
I ended up rebasing @mattilyra's branch on top of master and I got errors there too (see below), so this need to be looked at in more details. Of course @mattilyra if you have some spare bandwidth it would be great if you could have a closer look since it feels like you weren't that far to have something working:
distributed.scheduler - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'waiting' to 'processing'
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.core - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
handler(**merge(extra, msg))
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 1>
distributed.scheduler - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
yield self.handle_stream(comm=comm, extra={'client': client})
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
handler(**merge(extra, msg))
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
'key': key})
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
self.release_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'processing' to 'released'
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
yield self.handle_stream(comm=comm, extra={'client': client})
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
handler(**merge(extra, msg))
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
'key': key})
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
self.release_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.scheduler - ERROR - Error transitioning 'do_work-2b9f112f-723c-4975-becd-ff96436d3a3d' from 'processing' to 'forgotten'
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
yield self.handle_stream(comm=comm, extra={'client': client})
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
handler(**merge(extra, msg))
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3936, in transition
a = self.transition(key, 'released')
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
'key': key})
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
self.release_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
distributed.core - ERROR - <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2041, in add_client
yield self.handle_stream(comm=comm, extra={'client': client})
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 400, in handle_stream
handler(**merge(extra, msg))
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1576, in update_graph
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3352, in transition_waiting_processing
self.consume_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4102, in consume_resources
self.resources[r][ws] -= required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/local/lesteve/dev/distributed/distributed/core.py", line 346, in handle_comm
result = yield result
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2043, in add_client
self.remove_client(client=client)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 2070, in remove_client
client=cs.client_key)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 1856, in client_releases_keys
self.transitions(recommendations)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3991, in transitions
new = self.transition(key, finish)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3936, in transition
a = self.transition(key, 'released')
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3932, in transition
recommendations = func(key, *args, **kwargs)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3661, in transition_processing_released
'key': key})
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 3151, in _remove_from_processing
self.release_resources(ts, ws)
File "/home/local/lesteve/dev/distributed/distributed/scheduler.py", line 4108, in release_resources
self.resources[r][ws] += required
KeyError: <Worker 'tcp://127.0.0.1:46035', memory: 0, processing: 0>
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/home/local/lesteve/dev/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1485 try:
-> 1486 st = self.futures[key]
1487 exception = st.exception
KeyError: 'do_work-9d266e2f-1e97-465c-9ba4-d6374886950f'
During handling of the above exception, another exception occurred:
CancelledError Traceback (most recent call last)
<ipython-input-1-a86c453096d1> in <module>()
25 for i in range(20)]
26 cluster.scale(2)
---> 27 output = client.gather(futures)
28 pprint.pprint(output)
29 print(time.time() - t0)
/home/local/lesteve/dev/distributed/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1653 return self.sync(self._gather, futures, errors=errors,
1654 direct=direct, local_worker=local_worker,
-> 1655 asynchronous=asynchronous)
1656
1657 @gen.coroutine
/home/local/lesteve/dev/distributed/distributed/client.py in sync(self, func, *args, **kwargs)
673 return future
674 else:
--> 675 return sync(self.loop, func, *args, **kwargs)
676
677 def __repr__(self):
/home/local/lesteve/dev/distributed/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]
/home/local/lesteve/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/home/local/lesteve/dev/distributed/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()
/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
/home/local/lesteve/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself
/home/local/lesteve/dev/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1490 six.reraise(CancelledError,
1491 CancelledError(key),
-> 1492 None)
1493 else:
1494 six.reraise(type(exception),
/home/local/lesteve/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
CancelledError: do_work-9d266e2f-1e97-465c-9ba4-d6374886950f
I'm seeing similar behavior with my workload where all tasks are going to a single worker because other workers haven't come online yet. This thread is more than a year old, so I'm not sure what's changed since the beginning of this thread. Anyways, here are my findings w.r.t. worker stealing not working properly with resources.
From what I can tell, stealing logic is implemented in stealing.py
with a SchedulerPlugin
. Every time a worker is added, state is updated to keep track of what tasks can be stolen from it. In particular, this is the stealable
instance attribute. Every time a task transitions, this plugin checks to see whether the task is transitioning to "processing"
and calls put_key_in_stealable
passing in the task state.
Inside of put_key_in_stealable
, the cost of moving the task is computed using steal_time_ratio
. The first check in steal_time_ratio
is checking whether the task has hard restrictions and whether any of the restrictions are set:
if not ts.loose_restrictions and (
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
):
return None, None # don't steal
Back in put_key_in_stealable
, nothing happens if the returned cost is None
:
def put_key_in_stealable(self, ts):
ws = ts.processing_on
worker = ws.address
cost_multiplier, level = self.steal_time_ratio(ts)
self.log.append(("add-stealable", ts.key, worker, level))
if cost_multiplier is not None:
self.stealable_all[level].add(ts)
self.stealable[worker][level].add(ts)
self.key_stealable[ts] = (worker, level)
This would seem to explain why no work stealing occurs when tasks are marked with resources. Any comments on the analysis here?
Moving on to a possible fix, I think it makes sense to remove the restrictions check mentioned above and add some checks before the call to maybe_move_task
in balance
to ensure that the thief has the resources required to steal the task. Thoughts?
Any comments on the analysis here?
That all tracks.
add some checks before the call to maybe_move_task in balance to ensure that the thief has the resources required to steal the task.
That also makes sense, but the resource restriction on workers seems to complicate things quite a bit. Though, perhaps not... perhaps we can just continue to treat things normally, and only check the resource restriction at the last moment? That's won't be ideal, since we won't prioritize workers with the resource restriction as opportunities to steal, but it shouldn't be any worse...