distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Add thread interrupt logic

Open martindurant opened this issue 4 years ago • 12 comments

  • [X] Closes #4694
  • [x] Tests added / passed
  • [x] Passes black distributed / flake8 distributed / isort distributed

martindurant avatar Apr 21 '21 14:04 martindurant

This shows the basic working. The following script runs successfully:

import dask.distributed
import time
import threading
client = dask.distributed.Client(processes=False, n_workers=1, threads_per_worker=1)
client.wait_for_workers(1)

def runme(N):
    for i in range(N):
        time.sleep(1)

fut = client.submit(runme, 10000000)
time.sleep(0.01)  # make sure fut is allocated first
fut0 = client.submit(lambda x: x, True)

fut.cancel()
fut0.result()

As coded, this happens whenever an executing key is released for whatever reason, and there is no config for it. Also, if the task in question captures exceptions, it will keep running.

(please ignore print statements)

martindurant avatar Apr 21 '21 14:04 martindurant

This is cool to see. I know that this is still just a draft status, and I'm sure that this is in your plan, but I would encourage you to write down tests to demonstrate that some of the concerns raised in the issue are handled by this, for example that finally blocks are respected and such.

mrocklin avatar Apr 21 '21 15:04 mrocklin

@mrocklin : first pass at a pair of tests, but we will still need to be very careful and thorough here. btw: it took me ages to realist that fut.cancel() is a coroutine needing await.

I would still like some feedback here on whether this is a good idea for the general case, or should be a config, or only exposed via some explicit client call. The latter is hard, since the user may not have a handle to the particular task that is hanging (or just taking longer than expected).

martindurant avatar Apr 23 '21 19:04 martindurant

(windows error is in test_str, not obviously related)

martindurant avatar Apr 23 '21 20:04 martindurant

cc @crusaderky

jrbourbeau avatar Apr 27 '21 15:04 jrbourbeau

FWIW, I would be comfortable enough with calling C API directly for this to be merged. The API seems to be stable since py3.7 (https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc) If we add more tests and document this behaviour, I'd be fine with it. In terms of complexity, this is also justified considering the benefits. If this were to make trouble down the road, we could think about toggles but for now I'd like to avoid more options.

I'm wondering, would this be something we could contribute to CPython directly? I understand that we had the need to vendor the threadpool for the seceding feature but every modification raises maintenance cost (or the risk of it)

fjetter avatar Apr 28 '21 14:04 fjetter

Hello 👋 I stumbled upon this issue/PR while working with Prefect. (See https://github.com/PrefectHQ/prefect/issues/5043). I've also tested the proposed PR and it seems to fix this issue, great work @martindurant 🙌

@martindurant : Do you have the intention to finish this PR and make it part of distributed?

alexandervaneck avatar Oct 11 '21 14:10 alexandervaneck

provisionally ready for review - we need to decide whether this should be default or not, and how to describe it in the conf and docs.

martindurant avatar Oct 15 '21 19:10 martindurant

I have made the default False for now (no interrupt) and added the key to the config schema. It is pretty hidden! We could add this to the docs somewhere, or softly launch it by suggesting some users try it when facing long-running released tasks.

martindurant avatar Nov 02 '21 13:11 martindurant

Anything left here?

martindurant avatar Nov 03 '21 21:11 martindurant

ping?

martindurant avatar Nov 19 '21 15:11 martindurant

My final concern here is that we're not using the stdlib ThreadPoolExecutor interface any longer but require an extended interface which breaks some assumption about our compatibility, see

https://github.com/dask/distributed/blob/96d4fd43682c379f9b39b6dc55f568872b766a47/distributed/worker.py#L3433-L3445

Discussion around this https://github.com/dask/distributed/pull/5063/files#r670260505

FWIW, we're not consistent with this and I believe we should require a strict isinstance check, e.g. https://github.com/dask/distributed/blob/96d4fd43682c379f9b39b6dc55f568872b766a47/distributed/worker.py#L1610-L1613

fjetter avatar Dec 16 '21 14:12 fjetter