Running tasks creates linear memory growth
The following script is an MRE to demonstrate linear memory growth when running many tasks in a flow:
from memory_profiler import profile
from prefect import flow, task
try:
from prefect.cache_policies import NONE
cache_policy = dict(cache_policy=NONE)
except ImportError:
cache_policy = {}
@task(**cache_policy)
def with_task():
return [{"abc": "123"} for _ in range(10_000)]
@flow
@profile
def some_flow(n: int = 100):
for i in range(n):
with_task()
# without_task()
print("DONE")
@profile
def main():
some_flow(n=100)
if __name__ == "__main__":
main()
Running the flow with 100 tasks produces the following profile:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
22 365.8 MiB 365.8 MiB 1 @flow
23 @profile
24 def some_flow(n: int = 100):
25 624.2 MiB -9381.2 MiB 101 for i in range(n):
26 624.2 MiB -9122.8 MiB 100 with_task()
27 # without_task()
28 624.3 MiB 0.1 MiB 1 print("DONE")
Running the flow with 1000 tasks produces the following profile:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
22 344.1 MiB 344.1 MiB 1 @flow
23 @profile
24 def some_flow(n: int = 100):
25 3019.1 MiB -570676.8 MiB 1001 for i in range(n):
26 3019.1 MiB -568007.0 MiB 1000 with_task()
27 # without_task()
28 1190.7 MiB -1828.5 MiB 1 print("DONE")
We'd expect the memory usage to be consistent across runs since the flow isn't holding onto any return values, which suggests that we are holding onto results somewhere that we shouldn't be.
If the task engine is updated to explicitly drop in memory results, the memory usage decreases and stays flat across invocations.
10 task runs:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
22 360.8 MiB 360.8 MiB 1 @flow
23 @profile
24 def some_flow(n: int = 100):
25 369.5 MiB -11.7 MiB 11 for i in range(n):
26 369.5 MiB -3.0 MiB 10 with_task()
27 # without_task()
28 363.7 MiB -5.8 MiB 1 print("DONE")
100 task runs:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
22 346.5 MiB 346.5 MiB 1 @flow
23 @profile
24 def some_flow(n: int = 100):
25 360.4 MiB -6784.7 MiB 101 for i in range(n):
26 360.4 MiB -6770.8 MiB 100 with_task()
27 # without_task()
28 329.9 MiB -30.5 MiB 1 print("DONE")
This suggests a more sustainable solution to avoiding caching results in memory will resolve this issue.
I am also dealing with memory issues in Prefect, and it seems similar to the one you have described. You mentioned how telling the task engine to explicitly drop in memory results fixed the issue - how can I do that? I'm not entirely sure of how to explicitly drop in memory results - or what that even means. If it means that you won't be able to access the return value (or worse, can't return anything), are there any other solutions?
Hi @frankvp11 - this issue for Prefect 3.0. If you're on 3.0 please follow along here. If you're using Prefect 2, you may find https://github.com/PrefectHQ/prefect/issues/12668 a more relevant issue.