distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Incompatibility between allow_other_workers and resources

Open fjetter opened this issue 4 years ago • 2 comments

When using a multitude of annotations to specify where the task shall end up, this creates certain incompatibilities, in particular with resources. The following test causes a KeyError since the task is transitioned prematurely to a waiting state.

(What I actually wanted to do in this test is "Never execute on B" which is why I chose this construction but that is not the point here)

I believe there is a semantic misalignment in here since the resources are actually treated as a hard requirement, regardless of whether the allow_other_workers is set or not. I believe this is a sane behaviour since we otherwise cannot really tell whether we should or should not subtract anything. I'm wondering if the meaning of the flag allow_other_workers should be redefined. For some of the fields it may make sense (e.g. workers) but for things like host restrictions or resource restrictions this doesn't feel sensible to me.

The below exception should be "easily" fixable but the intended behaviour is unclear to me.


Looking at our own documentation it is already clear that there is some misalignment. I would argue that the definition of allow_other_workers makes sense but the loose_restrictions label is a bit ill defined, or at the very least not in alignment with the flag

See https://github.com/dask/distributed/blob/27e59258c79fa362df7d088ac73067e767b806a3/distributed/client.py#L1520-L1522

https://github.com/dask/distributed/blob/27e59258c79fa362df7d088ac73067e767b806a3/distributed/scheduler.py#L1240-L1251


Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2378, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 3148, in consume_resources
    ws._used_resources[r] += required
KeyError: 'A'
distributed.scheduler - ERROR - Error transitioning 'inc-aa226ae3a9f799819e1e685fba467442' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2035, in _transition
    a: tuple = func(key, *args, **kwargs)
  File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 2378, in transition_waiting_processing
    self.consume_resources(ts, ws)
  File "/Users/fjetter/workspace/distributed-main/distributed/scheduler.py", line 3148, in consume_resources
    ws._used_resources[r] += required
KeyError: 'A'
distributed.utils - ERROR - 'A'

@gen_cluster(
    client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}}
)
async def test_no_worker_recovers(c, s, a):
    s.periodic_callbacks["stealing"].stop()
    b = await Worker(s.address, name="b")

    f = c.submit(
        inc, 1, workers=[a.address], allow_other_workers=True, resources={"A": 1}
    )
    g = c.submit(
        inc, 2, resources={"A": 1}, workers=[a.address], allow_other_workers=True
    )

    await f
    await g
    assert f.key in a.tasks
    assert g.key in a.tasks
    assert f.key != g.key
    h = c.submit(add, f, g, workers=[b.address])
    await a.close()
    x = await Worker(s.address, resources={"A": 1}, name="x")

    res = await h
    assert res == 5

fjetter avatar May 07 '21 08:05 fjetter

xrefs to other issues relating to scheduling tasks with resources:

  • #6136
  • #6137

crusaderky avatar Apr 15 '22 13:04 crusaderky

I've recently run into this as well, specifically in the context of work-stealing from workers with restrictions where the tasks had been scheduled on a particular worker with allow_other_workers=True. From the docs, I would have assumed that tasks may get stolen but only onto workers fulfilling the resource restrictions.

hendrikmakait avatar Sep 12 '22 13:09 hendrikmakait