prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Running tasks creates linear memory growth

Open desertaxle opened this issue 1 year ago • 1 comments

The following script is an MRE to demonstrate linear memory growth when running many tasks in a flow:

from memory_profiler import profile

from prefect import flow, task

try:
    from prefect.cache_policies import NONE

    cache_policy = dict(cache_policy=NONE)
except ImportError:
    cache_policy = {}


@task(**cache_policy)
def with_task():
    return [{"abc": "123"} for _ in range(10_000)]

@flow
@profile
def some_flow(n: int = 100):
    for i in range(n):
        with_task()
        # without_task()
    print("DONE")


@profile
def main():
    some_flow(n=100)


if __name__ == "__main__":
    main()

Running the flow with 100 tasks produces the following profile:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    365.8 MiB    365.8 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    624.2 MiB  -9381.2 MiB         101       for i in range(n):
    26    624.2 MiB  -9122.8 MiB         100           with_task()
    27                                                 # without_task()
    28    624.3 MiB      0.1 MiB           1       print("DONE")

Running the flow with 1000 tasks produces the following profile:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    344.1 MiB    344.1 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25   3019.1 MiB -570676.8 MiB        1001       for i in range(n):
    26   3019.1 MiB -568007.0 MiB        1000           with_task()
    27                                                 # without_task()
    28   1190.7 MiB  -1828.5 MiB           1       print("DONE")

We'd expect the memory usage to be consistent across runs since the flow isn't holding onto any return values, which suggests that we are holding onto results somewhere that we shouldn't be.

desertaxle avatar Jun 25 '24 18:06 desertaxle

If the task engine is updated to explicitly drop in memory results, the memory usage decreases and stays flat across invocations.

10 task runs:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    360.8 MiB    360.8 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    369.5 MiB    -11.7 MiB          11       for i in range(n):
    26    369.5 MiB     -3.0 MiB          10           with_task()
    27                                                 # without_task()
    28    363.7 MiB     -5.8 MiB           1       print("DONE")

100 task runs:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    346.5 MiB    346.5 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    360.4 MiB  -6784.7 MiB         101       for i in range(n):
    26    360.4 MiB  -6770.8 MiB         100           with_task()
    27                                                 # without_task()
    28    329.9 MiB    -30.5 MiB           1       print("DONE")

This suggests a more sustainable solution to avoiding caching results in memory will resolve this issue.

desertaxle avatar Jun 25 '24 18:06 desertaxle

I am also dealing with memory issues in Prefect, and it seems similar to the one you have described. You mentioned how telling the task engine to explicitly drop in memory results fixed the issue - how can I do that? I'm not entirely sure of how to explicitly drop in memory results - or what that even means. If it means that you won't be able to access the return value (or worse, can't return anything), are there any other solutions?

frankvp11 avatar Jul 09 '24 12:07 frankvp11

Hi @frankvp11 - this issue for Prefect 3.0. If you're on 3.0 please follow along here. If you're using Prefect 2, you may find https://github.com/PrefectHQ/prefect/issues/12668 a more relevant issue.

zhen0 avatar Jul 11 '24 14:07 zhen0