Call the persist method on an array after using map_blocks and sending a Python object never release the copies of the object created for every chunk
Describe the issue: I was trying to apply a function that receives a Python Object that contains a small pandas series (less than 40K rows) to every chunk of an array, but I have noticed that the process is using an extreme amount of memory (taking into account the size of my array) and when I call the persist method that memory is never rereleased to the workers, I think that the map_blocks method is creating a copy of the object for every chunk and then it keeps some kind of pointer to every one of them and it only release them when the future is deleted. Sorry if this was reported before but I was not able to find other similar issues.
Minimal Complete Verifiable Example:
import pandas as pd
import dask
import time
class Mapper:
def __init__(self, s: pd.Series):
self.s = s
def map_arr(self, x):
return x + 1
def f(x, m):
# The problem is present even if I do not use the mapper object
return m.map_arr(x)
mapper = Mapper(pd.Series(list(range(30000))))
# If I decrease the chunk size then it creates more copies and consumes more memory
# arr = dask.array.zeros(shape=(1000, 1000), chunks=(1, 10)),
# using the commented array makes my cluster reach the max memory limit on every worker and get stuck forever
arr = dask.array.zeros(shape=(1000, 1000), chunks=(10, 10))
a = arr.map_blocks(f, mapper, meta=arr).persist()
time.sleep(15)
print({
k: worker_info["metrics"]["memory"]
for k, worker_info in client.scheduler_info()['workers'].items()
})
# output: {0: 615858176, 1: 604512256, 2: 614825984, 3: 601804800, 4: 615587840, 5: 609296384}
# Release the memory
del a
# If the object is first distributed then the problem disappears
mapper_future = client.scatter(mapper, broadcast=True)
b = arr.map_blocks(f, mapper_future, meta=arr).persist()
time.sleep(15)
print({
k: worker_info["metrics"]["memory"]
for k, worker_info in client.scheduler_info()['workers'].items()
})
# output: {0: 207790080, 1: 206336000, 2: 207065088, 3: 207208448, 4: 207249408, 5: 207171584}
del b
Anything else we need to know?:
Environment:
- Dask version: dask==2023.8.1
- Python version: 3.10
- Operating System: Linux
- Install method (conda, pip, source): pip
I think the issue is more related to dask distributed, so should I open the issue here https://github.com/dask/distributed/issues?
I modified the example to use the class MemorySampler.
import pandas as pd
import dask
import time
from dask.distributed.diagnostics import MemorySampler
ms = MemorySampler()
class Mapper:
def __init__(self, s: pd.Series):
self.s = s
def map_arr(self, x):
return x + 1
def f(x, m):
# The problem is present even if I do not use the mapper object
return m.map_arr(x)
mapper = Mapper(pd.Series(list(range(30000))))
# If I decrease the chunk size then it creates more copies and consumes more memory
# arr = dask.array.zeros(shape=(1000, 1000), chunks=(1, 10)),
# using the commented array makes my cluster reach the max memory limit on every worker and get stuck forever
arr = dask.array.zeros(shape=(1000, 1000), chunks=(10, 10))
with ms.sample("excessive_memory_use"):
a = arr.map_blocks(f, mapper, meta=arr).persist()
time.sleep(15)
# Release the memory
del a
time.sleep(1)
# If the object is first distributed, the problem disappears
mapper_future = client.scatter(mapper, broadcast=True)
with ms.sample("acceptable_memory_use"):
b = arr.map_blocks(f, mapper_future, meta=arr).persist()
time.sleep(15)
ms.plot(align=True)
del b
Is this a bug?
I think this is somewhat expected. The object mapper you are defining requires about 250kB in memory. When calling map_blocks this will generate tasks internally that embed whatever arguments you're passing. In this case, you'll embed the mapper into every task.
the array in your example has 10k chunks so 10_000 * 236.06 kiB ~ 2.25 GiB
You should be seeing a warning Sending large graph of size XXX etc. that is supposed to alert you about that.
def f(x, m): # The problem is present even if I do not use the mapper object return m.map_arr(x)
Even with this code you are indeed using the mapper object since m is an instance, m.map_arr is an instance method and to serialize an instance method, it has to attach the instance itself as well.
Thanks for the answer, it makes sense that it consumes that amount of memory due to the size of all the tasks but, is it expected that it never releases the memory of the tasks when the persist method is invoked? I would expect that once the array is persisted the tasks are no longer needed and they can be released.
Typically it is expected that a task's definition is small and we are retaining the task until the computation is completed. Specifically, it is retained for as long a local future / persisted collection is referencing the task. We need to retain the task's definition to ensure resilience in case of worker failures (amongst other things). So no, this memory is not released once the task is computed but only after the task is forgotten/released.
In these situations we recommend one of two things
- Ideally whatever data is in mapper can either be generated inside of a dask task or loaded from a remote storage. You could define whatever function this is using dask.delayed and keep the rest of the code as is
- If it has to be local data you should consider using https://distributed.dask.org/en/stable/api.html#distributed.Client.scatter
Thanks, I think that clarify all my doubts.