distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Worker can memory overflow by fetching too much data at once

Open fjetter opened this issue 2 years ago • 12 comments

There is a possibility for a worker to suicide by fetching too much data at once

Old version:

https://github.com/dask/distributed/blob/84cbb099e0d0f1e1947d51d957f0b65e9b0635dd/distributed/worker.py#L2754-L2757

New version:

https://github.com/dask/distributed/blob/3647cfefd004281dac0ffa92349e23bc2a94d68a/distributed/worker_state_machine.py#L1425-L1429

Simple example

Let's assume that keys have on average ~500MB Local memory available is ~10GB total_out_connections: 50 (default)

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

This problem is exacerbated if the keys data sizes are underestimated (e.g. bias/bug in dask.sizeof)

cc @crusaderky

fjetter avatar Apr 26 '22 14:04 fjetter

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

I think it's also likely this wouldn't always kill the worker, but freeze it like in https://github.com/dask/distributed/issues/6110. If the 25GiB come somewhat staggered in those 500MB increments, physical memory usage will creep up very close to the 10GB max, without actually requesting more than it. Once it gets close enough, the system will thrash to the point that everything stops, so more memory isn't allocated (which would put it over the limit and kill it). The smaller the key size and larger the key count, this more likely this is to happen (probably why a shuffle was a good reproducer).

Since we know the (approximate) size of all the keys we're fetching, this seems like something we can avoid doing in the first place (don't fetch more than we have memory capacity for, as a start). Maybe also a use-case for receiving data directly to disk https://github.com/dask/distributed/issues/5996?

gjoseph92 avatar Apr 26 '22 15:04 gjoseph92

pseudocode:

if current managed_in_memory + self.comm_nbytes > threshold% * max_memory:
    stop scheduling further gather_dep's

or with better encapsulation:

    if self.data.would_exceed_threshold(self.comm_nbytes):
        stop scheduling further gather_dep's

There are two ways to resume gathering:

  1. a dependency fails to arrive, thus reducing comm_nbytes
  2. enough dependencies arrive to send the worker over threshold. Older keys are automatically spilled, thus reducing managed_in_memory until it's below threshold.

Note that the above mitigates the issue, but doesn't actually prevent the use case that @fjetter showed above in case of a task that individually requires more memory than what's available:

CHUNK_SIZE = 500 // 8 * 2**20  # 500 MiB
a = da.random.random(CHUNK_SIZE * 20, CHUNK_SIZE)  # 10 GiB total
a = a.persist()
f = client.submit(lambda _: None, futures_of(a))

What will happen is that whatever worker runs f will start accumulating all chunks of a locally. In main, if there are 20+ workers in the cluster, this means immediate death from gather_dep. With the change above, gather_dep will be successful and all the data will be copied onto the worker and progressively spilled. However, when the time comes to transition f from ready to running, then

  1. chunks of a will be moved from slow to fast
  2. before f can start running, the first chunks are already being sent back from fast to slow; however they also exist in memory. The worker goes to 100% and if we're lucky it's terminated, if not it just freezes due to swap thrashing.

And this is not considering unrelated unmanaged memory - e.g. on a multithreaded worker, there may be other tasks running.

crusaderky avatar Apr 27 '22 11:04 crusaderky

Two thoughts:

  1. How often does this happen? Presumably this was related to the Florian Shuffle Deadlock (I need a better name for this). Have we seen this happen in other common situations?

Let's assume that keys have on average ~500MB Local memory available is ~10GB total_out_connections: 50 (default)

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

If planning for worst case would cripple things (You must have 25GB of memory free before you do any work!) then I would not want to plan for worst case. I think that we could reduce this burden in a few ways:

  1. We could try to learn the size of transfers on the worker (creating something like a worker.py::TaskPrefix)
  2. We could ask the sender's side to cancel for us. "Hey other worker! I want data "x" but I'm running kinda low on memory. I only have 4 GB free. Please only send something if it's less than 2GB? If it's greater, let me know how large it is instead so that I can plan accordingly"
  3. We could start to reduce the number of concurrent inbound connetions as our memory starts to dry up.

Just some thoughts to reduce the burden here. Please don't let these distract too much from the main conversation.

mrocklin avatar Apr 27 '22 12:04 mrocklin

  1. We could try to learn the size of transfers on the worker (creating something like a worker.py::TaskPrefix)
  2. We could ask the sender's side to cancel for us. "Hey other worker! I want data "x" but I'm running kinda low on memory. I only have 4 GB free. Please only send something if it's less than 2GB? If it's greater, let me know how large it is instead so that I can plan accordingly"

The receiving worker already knows in advance how large the data will be before it asks for it. That's returned by select_keys_for_gather which in turn is reflected into Worker.comm_nbytes.

  1. We could start to reduce the number of concurrent inbound connetions as our memory starts to dry up.

I would not want to count workers towards the maximum if they are going to send me only a few kilobytes .

crusaderky avatar Apr 27 '22 13:04 crusaderky

Ah, great then.

mrocklin avatar Apr 27 '22 13:04 mrocklin

In coiled/coiled-runtime#229, we have found a reliable reproducer for a worker getting oom-killed by fetching too much data. The issue is exacerbated by a misconfigured Worker.memory_limit (coiled/feedback#185), but I am confident that we can also reproduce this issue if that is fixed by simply scaling test_tensordot_stress to more workers. With a default chunk size of 100 MiB and the default of 50 outgoing and 10 incoming connections per worker, we might use up to ~2.9 GiB for communication, which is ~90 % of all available memory available to the worker on a t3.medium.

EDIT: Apart from 100 MiB being the default chosen by dask.array itself in coiled/coiled-runtime#229, we recommend chunk sizes between 10(0) MiB and 1 GiB in several places:

  • https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes#rough-rules-of-thumb
  • https://docs.dask.org/en/stable/array-chunks.html#specifying-chunk-shapes

hendrikmakait avatar Aug 16 '22 14:08 hendrikmakait

One quick yet imperfect fix might be to introduce a parameter that limits the amount of memory we want to dedicate to communications, effectively adjusting the above if-clause to

if ( 
    (
        len(self.in_flight_workers) >= self.total_out_connections 
         and self.comm_nbytes >= self.comm_threshold_bytes
    ) or self.comm_nbytes >= self.comm_memory_limit
 ): 
     return {}, [] 

This value could be set as a percentage of Worker.memory_limit to account for different machines. As a downside, this might have negative effects on situations where all we want to do for now is fetching an initial set of data to work with.

To better understand the burden comms put onto our workers, we may also want to add metrics about the number of open connections and the size of self.comm_nbytes. XREF: #6892

In addition, we may want to drop self.comm_threshold_bytes as well as the limits on incoming/outgoing connections as a parameters and fully rely on a mechanism that looks at currently used memory to schedule comms limited by the size of the connection pool.

cc @fjetter

hendrikmakait avatar Aug 16 '22 14:08 hendrikmakait

It's also worth noting that the same issue exists for workers sending data. There's a limit to the number of incoming connections a worker can have at once (default 10), but not the number of bytes it can be sending at once. If a worker is paused, it abruptly limits itself to 1 connection, but that message could still be large.

Serialization may not be zero-copy. If compression is used, it's never zero-copy. TLS comms will also require a copy in the SSL buffer. So even if the sent data is already in memory, we could potentially be doubling (or more) memory usage of that data until the other worker confirms it received the message.

The worst case is when the data being sent is currently spilled to disk. get_data will then read it into memory (which stores a strong reference to it in the data.fast dict, so even after it's been sent, it will remain in memory at least until the next memory monitor interval, which could a half-second away—months away, compared to the speed of memory allocations). Then, it might have even more copies in serialization, compression, and encryption. Sending a spilled key can be a very costly operation, as well as slow, which is why I advocate for https://github.com/dask/distributed/issues/5996 so much.

Ideally, a worker shouldn't be willing to send more than memory_target - current_memory bytes. Of course, current_memory isn't something we can measure all the time, so a fixed threshold like @hendrikmakait suggested above might be a good place to start.

gjoseph92 avatar Aug 19 '22 22:08 gjoseph92

I wrote this a big ago, but I'd like to point to https://github.com/dask/distributed/issues/6212 as a more holistic approach to this problem. The overall idea is that:

  • We do many things concurrently that require non-trivial amounts of memory (run user tasks, receive keys, send keys, etc.).
  • If we have 1gb of memory left, and we want to both receive 600mb of data from another worker, and load 600mb of spilled data back into memory to run a task, wouldn't it be nice if those operations could coordinate to wait for each other, instead of running concurrently and probably killing the worker?

It's common to use a synchronization primitive like a semaphore or CapacityLimiter to manage concurrent code's access to a limited resource. Maybe we could have a similar structure around memory?

gjoseph92 avatar Aug 19 '22 22:08 gjoseph92

The worst case is when the data being sent is currently spilled to disk. get_data will then read it into memory (which stores a strong reference to it in the data.fast dict, so even after it's been sent, it will remain in memory at least until the next memory monitor interval, which could a half-second away

Not quite correct. If unspilling a key causes the reported managed memory to get above target (60%), then other keys will immediately be spilled out, until you are again below target.

The mechanism you mention happens specifically and only when managed memory is below 60% and process memory is above spill (70%). In other words, you either have at least 10% unmanaged memory or sizeof() is severely under-reporting memory usage. The 10% unmanaged memory use case is interesting, as it is a lot easier to reach on smaller workers. A brand new worker (with pandas imported) occupies a baseline of 138 MiB on my machine.

crusaderky avatar Aug 20 '22 11:08 crusaderky

It's common to use a synchronization primitive like a semaphore or CapacityLimiter to manage concurrent code's access to a limited resource. Maybe we could have a similar structure around memory?

We do not know how much data sending/receiving actually requires, nor do we know how much a user task requires. Without reliable measurements I would prefer sticking to a pragmatic, low complexity approach for now.

I do believe that simply introducing a threshold as proposed by @hendrikmakait in https://github.com/dask/distributed/issues/6208#issuecomment-1216710657 will improve things significantly already.

I'm not sure if an absolute threshold, a relative (to memory_limit) threshold or a dynamic threshold (percentage of free/available memory) fits best. My gut feeling tells me to start with a relative threshold (based on memory_limit). Together with the added instrumentation, we can iterate on this choice incrementally, if necessary. If there are still significant shortcomings afterwards, we can think about a more complex solution.

fjetter avatar Aug 26 '22 14:08 fjetter

We reverted https://github.com/dask/distributed/pull/6975 in https://github.com/dask/distributed/pull/6994 before releasing. Re-opening this issue as a reminder to restore https://github.com/dask/distributed/pull/6975 post-release

jrbourbeau avatar Sep 02 '22 20:09 jrbourbeau

This is in, by now

fjetter avatar Oct 24 '22 14:10 fjetter