distributed
distributed copied to clipboard
Add thread interrupt logic
- [X] Closes #4694
- [x] Tests added / passed
- [x] Passes
black distributed/flake8 distributed/isort distributed
This shows the basic working. The following script runs successfully:
import dask.distributed
import time
import threading
client = dask.distributed.Client(processes=False, n_workers=1, threads_per_worker=1)
client.wait_for_workers(1)
def runme(N):
for i in range(N):
time.sleep(1)
fut = client.submit(runme, 10000000)
time.sleep(0.01) # make sure fut is allocated first
fut0 = client.submit(lambda x: x, True)
fut.cancel()
fut0.result()
As coded, this happens whenever an executing key is released for whatever reason, and there is no config for it. Also, if the task in question captures exceptions, it will keep running.
(please ignore print statements)
This is cool to see. I know that this is still just a draft status, and I'm sure that this is in your plan, but I would encourage you to write down tests to demonstrate that some of the concerns raised in the issue are handled by this, for example that finally blocks are respected and such.
@mrocklin : first pass at a pair of tests, but we will still need to be very careful and thorough here. btw: it took me ages to realist that fut.cancel() is a coroutine needing await.
I would still like some feedback here on whether this is a good idea for the general case, or should be a config, or only exposed via some explicit client call. The latter is hard, since the user may not have a handle to the particular task that is hanging (or just taking longer than expected).
(windows error is in test_str, not obviously related)
cc @crusaderky
FWIW, I would be comfortable enough with calling C API directly for this to be merged. The API seems to be stable since py3.7 (https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc) If we add more tests and document this behaviour, I'd be fine with it. In terms of complexity, this is also justified considering the benefits. If this were to make trouble down the road, we could think about toggles but for now I'd like to avoid more options.
I'm wondering, would this be something we could contribute to CPython directly? I understand that we had the need to vendor the threadpool for the seceding feature but every modification raises maintenance cost (or the risk of it)
Hello 👋 I stumbled upon this issue/PR while working with Prefect. (See https://github.com/PrefectHQ/prefect/issues/5043). I've also tested the proposed PR and it seems to fix this issue, great work @martindurant 🙌
@martindurant : Do you have the intention to finish this PR and make it part of distributed?
provisionally ready for review - we need to decide whether this should be default or not, and how to describe it in the conf and docs.
I have made the default False for now (no interrupt) and added the key to the config schema. It is pretty hidden! We could add this to the docs somewhere, or softly launch it by suggesting some users try it when facing long-running released tasks.
Anything left here?
ping?
My final concern here is that we're not using the stdlib ThreadPoolExecutor interface any longer but require an extended interface which breaks some assumption about our compatibility, see
https://github.com/dask/distributed/blob/96d4fd43682c379f9b39b6dc55f568872b766a47/distributed/worker.py#L3433-L3445
Discussion around this https://github.com/dask/distributed/pull/5063/files#r670260505
FWIW, we're not consistent with this and I believe we should require a strict isinstance check, e.g. https://github.com/dask/distributed/blob/96d4fd43682c379f9b39b6dc55f568872b766a47/distributed/worker.py#L1610-L1613