distributed
distributed copied to clipboard
Automatically restart memory-leaking workers when they reach a critical limit
Attempt to fix https://github.com/dask/distributed/issues/4193.
This pull request successfully restarts memory-leaking workers. However, some workers still keep freezing.
Minimal example to reproduce memory leak outside of the test suite:
import dask
import dask.distributed
import numpy as np
import time
cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1, memory_limit="512M")
client = dask.distributed.Client(cluster)
x = {}
def memory_leaking_fn(data):
x[data] = np.random.randint(100, size=12 * 1024**2 // 8)
time.sleep(0.1)
return data
futures = client.map(memory_leaking_fn, range(1000))
for f in futures:
print(f.result())
I think that the main question we need to determine is what policy we should use here. Should we always restart? Should we never restart but keep logging? Should we log for a while, but after it appears to not be getting any better (maybe five seconds) then restart? I like this last option personally.
Also, thank you for taking the time to submit this.
I think that the main question we need to determine is what policy we should use here. Should we always restart? Should we never restart but keep logging? Should we log for a while, but after it appears to not be getting any better (maybe five seconds) then restart? I like this last option personally.
IMHO this should happen directly before starting to work on a new task. A worker that hits this message should restart as soon as possible because the memory leak eats resources that could be used for the task.
However, I wonder if there is an in-place restarting possibility for the worker. I.e. "start new worker process" -> "transfer items + tasks to new worker" -> "kill old worker" This would have a number of advantages:
- on-disk data can be kept and does not need to be transferred over network
- fast-path transfer via shared memory?
- data keeps locality
- the scheduler can be relieved
However, I wonder if there is an in-place restarting possibility for the worker. I.e. "start new worker process" -> "transfer items + tasks to new worker" -> "kill old worker" This would have a number of advantages:
Oh yeah, that actually sounds like a great idea.
fast-path transfer via shared memory?
Fortunately in this situation all of the data is already on disk, so we probably don't need to trouble ourselves with this.
In principle I think that we would want to ...
- Use the same Nanny to create a new worker process, providing arguments to the
data=
keyword to generate a zict buffer pointing to the same file location - Wait until that is up
- Shut down this worker process, being careful not to delete the data on disk
- Rely on the scheduler to make intelligent choices about moving the tasks-to-be-run on the old worker around. Probably they'll mostly end up on the new worker on this machine, but it's ok if they're not.
This is a non-trivial engineering effort, but might be an interesting project for someone who wants to become more familiar with the worker-nanny-scheduler relationship. Does this work interest you @Hoeze ?
This is a non-trivial engineering effort, but might be an interesting project for someone who wants to become more familiar with the worker-nanny-scheduler relationship. Does this work interest you @Hoeze ?
I'd like to solve this problem and think it would save a lot of people (including me) a lot of time if this restarting works bomb-proof. However, I also have two projects to finish soon.
I'm happy to invest another working day on this issue, but I think it would be better if someone with deeper knowledge could jump in here.
I don't fully understand what is happening here but the function and leaky dictionary are stored in cache_loads on the worker. As an en experiment, I rewrote the function slightly and pull out the serialized function from the worker:
x = {}
def memory_leaking_fn(data):
x[data] = random.randint(10,100)
x[data+str(random.randint(10,10000))] = 'foo'
print(x)
print(locals())
time.sleep(0.1)
return data
In a separate process load the serialized function
In [1]: import pickle
In [2]: ser_func = b"\x80\x04\x95\x93\x02\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x08KCCHt\x00\xa0\x01d\x01d\x02\xa1\x02t\x02|\x00<\x
...: 00d\x03t\x02|\x00t\x03t\x00\xa0\x01d\x01d\x04\xa1\x02\x83\x01\x17\x00<\x00t\x04t\x02\x83\x01\x01\x00t\x04t\x05\x83\x00\x83\x01\x01\x00t\x06\xa0\x07d\x05\xa1\x01\x01\x00|\x00S\x00\x94(NK\nKd\x8c\x03foo\x94M\x10'G?\xb9\x99\x99\x99\x99\x99\x9at\x94(\x8c\x06random\x94
...: \x8c\x07randint\x94\x8c\x01x\x94\x8c\x03str\x94\x8c\x05print\x94\x8c\x06locals\x94\x8c\x04time\x94\x8c\x05sleep\x94t\x94\x8c\x04data\x94\x85\x94\x8c\x07test.py\x94\x8c\x11memory_leaking_fn\x94K\nC\x0c\x00\x02\x10\x01\x18\x01\x08\x01\n\x02\n\x01\x94))t\x94R\x94}\x9
...: 4(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94\x8c\x07test.py\x94uNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h#}\x94}\x94(h\x1eh\x18\x8c\x0c__qualname__\x94h\x18\x8c\x0f__annot
...: ations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1f\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94(h\x12h\x00\x8c\tsubimport\x94\x93\x94\x8c\x04time\x94\x85
...: \x94R\x94h\x0ch6h\x0c\x85\x94R\x94h\x0e}\x94uu\x86\x94\x86R0."
In [3]: func = pickle.loads(ser_func)
In [4]: func(str(1))
{'1': 15, '19359': 'foo'}
{'data': '1'}
Out[4]: '1'
In [5]: func(str(2))
{'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}
{'data': '2'}
Out[5]: '2'
In [6]: func.__globals__['x']
Out[6]: {'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}
In [7]: globals()['x']
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-7-9edb74b452e4> in <module>
----> 1 globals()['x']
KeyError: 'x'
This is moving out of my knowledge but it looks like the global space of the deserialized function brings its own copy of the dictionary -- this probably one of the ways cloudpickle/pickle can bring in dependencies when serializing a function g which depends locally on a func f.
@quasiben and I spoke about this offline. I think that he is trying to avoid the situation where cloudpickle is handling a function that leaks data in a closed-over variable. I think that this is not a good path to go down. I think that we need to assume that user code can leak in ways that we will never be able to resolve, and that we will need to occasionally restart in these situations.
I'm happy to invest another working day on this issue, but I think it would be better if someone with deeper knowledge could jump in here.
I agree that that would be good. It may not happen quickly though. There is a lot of work happening right now, and most maintainers are unfortunately quite busy these days.
I would like to help but I lack knowledge. If anyone can point me in the right direction I am willing to spend my time to fix this.
What's the best way to install this repo using pip?
last time I tried using pip install git+https://github.com/Hoeze/distributed/[email protected]
but I noticed that the changes are in the master branch and I have dask 2.30 which is incompatible with the code in master (there are some imports which fail).
I tried to manually do the change in my site-package folder, since it's just one line and it does the trick.
The workers restart when they reach the memory limit BUT the job fails with this traceback:
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
1849 exc = CancelledError(key)
1850 else:
-> 1851 raise exception.with_traceback(traceback)
1852 raise exc
1853 if errors == "skip":
KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>)
If this PR gets completed it would be a huge improvement for using dask
with long running jobs.
Hi @gioxc88, installing dask-distributed requires (due to a reason I don't know) dask from master.
I updated my pull request to current master and fixed the memory leak test. You should now be able to just pip install it from my current master branch.
Hi @gioxc88, installing dask-distributed requires (due to a reason I don't know) dask from master.
I updated my pull request to current master and fixed the memory leak test. You should now be able to just pip install it from my current master branch.
Thanks for the answer, I'll try again tomorrow. Are there any new development on the actual code (aside from the tests)?
Many thanks
Not from my side. I do have the same issues (https://github.com/dask/distributed/issues/4193#issuecomment-740261518).
This patch only fixes one very specific problem in terms of external memory-leaking processes by trying to gracefully restart them. However, when the worker gets killed by e.g. out-of-memory, there is currently no chance to recover the tasks stored on the worker. They need to be recalculated.
You can try to increase allowed-failures
(e.g. 50) in your ~/.config/dask/distributed.yaml
. This helps a lot to recover from worker failures.
Also, enabling lifetime restarts makes a huge difference (I set 60min with 10min stagger).
IMHO, what should be done to solve these issues:
- Directly pass every finished task to the nanny and let it handle the data transfers
- Allow in-place restarts of the worker
This way, we don't care about worker failures any more. Results are known to have a certain size and reside at a secure location.
This looks to solve a perennial issue for me. Is there any reason not to take this less-than-perfect solution for now and work on the nanny-restarts-worker solution in the longer term? If it is just a matter of implementing a delay and a check for currently executing tasks in the review comment I can do that.
I expect that this PR would kill off long-running tasks that temporarily allocate a lot of RAM. I'm in the process of designing a more holistic and robust solution.
If memory usage is high enough that we hit the pause fraction watermark and there are no keys to spill to disk, then I think what we could do is:
- Stop new task executions
- Wait for existing tasks to drain out
- Run gc a few times
I think that if memory usage remained high after that sequence the worker should kill itself. The situation cannot reasonably be expected to improve if tasks are not cleaning up after themselves and gc cannot do it either.