Memory leak
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When running prefect app in an infinite loop (as a job that should listen to e.g. some messaging service and process the events) we are running into infinite memory growth. We tried to inspect our code but even if we run the most minimal example without persisting any data and also garbage collecting the memory still grows.
Reproduction
import gc
import asyncio
import time
from prefect import flow, task
@task(
#retries=3,
#persist_result=False,
#cache_result_in_memory=False,
)
async def build_dataframe_async(id, index):
print(f"Hello World = id = {id}, index = {index}")
@flow(
# log_prints=True,
# persist_result=False,
cache_result_in_memory=False,
)
async def flow_async(id):
for index in range(30):
await build_dataframe_async(id, index)
if __name__ == '__main__':
for index in range(5):
print(f"____________________ Index = {index} ____________________")
asyncio.run(flow_async(index))
gc.collect(0)
gc.collect(1)
gc.collect(2)
time.sleep(0.01)
gc.collect()
time.sleep(0.01)
Error
No response
Versions
python 3.10
prefect 2.10.18
Additional context
No response
Also experiencing this with a very basic example flow set to run ~1min:
Python 3.10 prefect 2.13.0
I think this is related to https://github.com/pydantic/pydantic/pull/6681 which was fixed in pydantic v2. I'm linking this to our ticket for updating to pydantic v2 since I expect this issue will go away once we support the upgrade.
Related to https://github.com/PrefectHQ/prefect/issues/10145
on 2.14.16 memory still leaks
To reproduce:
pip install matplotlib prefect pylab-sdk memory-profiler
test_flow.py
import asyncio
import collections
import contextlib
import gc
import os
import pprint
import time
import tracemalloc
import anyio
import prefect._internal.concurrency.threads
import prefect.client.base
import prefect.engine
import prefect.events.related
import prefect.utilities.asyncutils
from prefect import context, flow, task, task_runners
@task(
retries=3,
persist_result=False,
cache_result_in_memory=False,
)
async def build_dataframe_async(id, index):
print(f"Hello World = id = {id}, index = {index}")
@flow(
cache_result_in_memory=False,
)
async def flow_async(id):
for index in range(80):
await build_dataframe_async(id, index)
if __name__ == "__main__":
for index in range(100):
print(f"____________________ Index = {index} ____________________")
asyncio.run(flow_async(index))
gc.collect(0)
gc.collect(1)
gc.collect(2)
time.sleep(0.01)
gc.collect()
time.sleep(0.01)
execute.sh
#!/bin/bash
export PYTHONTRACEMALLOC=1
rm mprofile_*
# Current supported backends: 'psutil', 'psutil_pss', 'psutil_uss', 'posix', 'tracemalloc'. Defaults to 'psutil'
mprof run --backend psutil -C --python python ./test_flow.py
mprof plot
./execute.sh
produces
I investigated this issue to see if there is any relation to the work in #12019, and I believe the leak here is different.
I've located code that reveals the leak, but I haven't understood why yet: https://github.com/PrefectHQ/prefect/blob/e1936a860061a6bfb0ec82e1bd8b43a9f7ea4d53/src/prefect/_internal/concurrency/calls.py#L377
If we use this set_current_call helper during async function call execution, we get a leak; remove that line and the leak vanishes. However, all that code does is set a contextvar: https://github.com/PrefectHQ/prefect/blob/e1936a860061a6bfb0ec82e1bd8b43a9f7ea4d53/src/prefect/_internal/concurrency/calls.py#L48-L54
I've invalidated several working theories about why this happens but will have to regroup and try to identify some different unknowns / possible conditions. Meanwhile, #12019 will help some egregious leaks with task arguments.
Several theories I have disproved:
Callcalls_run_sync(), entering its current call context manager, then calls_run_async()afterward, which enters its context manager (the same one). Is that the issue?- No, if I turn the sync one off and leave the async on, it fails, sync on and async off, it works
- So the problem is specifically the async use of a contextvar
- The problem is not the lack of an async context manager, same problem happens with async context manager, that doesn't matter anyway with a contextvar
- The problem is not that async and sync calls use the same contextvar, I tried separate contextvars and context managers, same issue, again only with async calls (i.e.
_call_async()). - Doesn’t matter if the call happens within or outside of the cancel scope
- Running _run_async() in a copy of the call’s context doesn’t solve it either
In #12019 I switched to storing a weakref for our current_call contextvar because setting the contextvar within an async call appears to never free that memory. We don't need a strong reference at this point, so this works. Doing so improves the memory growth of the example script but doesn't halt it, so I'll continue to investigate other leaks.
I took another pass at this and found a couple more leaks I think we can plug.
With changes:
There's still something leaking, so I'm continuing to search.
I think I've found the source of the leak, but I'm still working on the best way to solve it. This is 50 flow runs of the example script (rather than 100):
And here's 50 runs with my latest PR:
Hey @abrookins, I think this leak is still ongoing in Prefect 2.16.4.
Here's a minimum viable example using Python's memory-profiler.
from memory_profiler import profile
from prefect import flow, task
@task
@profile
def with_task():
return [{"abc": "123"} for _ in range(10_000)]
@profile
def without_task():
return [{"abc": "123"} for _ in range(10_000)]
@flow
@profile
def main():
for i in range(10):
with_task()
# without_task()
print("DONE")
main()
If we run this and call without_task there is no memory leak. We can see from the memory profiler output that memory remains constant as we iterate through our tasks.
But if we comment out without_task and run with_task we can see that memory grows as we loop through our tasks.
The only difference between these functions is that one has the Prefect @task decorator.
Can we re-open this issue?
Matthew, can you paste console text so I can see the full output you're getting? It may be different than mine, which does not suggest a leak.
Can you also share the output of:
prefect version
It should look like this, with extended output, not just the version #:
$ prefect version
Version: 2.16.3
API version: 0.8.4
Python version: 3.12.2
Git commit: e3f02c00
Built: Thu, Mar 7, 2024 4:56 PM
OS/Arch: darwin/arm64
Profile: default
Server type: cloud
Also, OS version?
Apple M3, Sonoma 14.2.1
prefect version
Version: 2.16.3
API version: 0.8.4
Python version: 3.12.2
Git commit: e3f02c00
Built: Thu, Mar 7, 2024 4:56 PM
OS/Arch: darwin/arm64
Profile: default
Server type: cloud
Here's console output below from the above snippet.
Note that I'm only running 5 loops here for brevity of output, but if we increase that loop, memory grows infinitely.
13:27:57.390 | INFO | prefect.engine - Created flow run 'tomato-civet' for flow 'main'
13:27:57.393 | INFO | Flow run 'tomato-civet' - View at https://app.prefect.cloud/account/9a597790-3884-4982-99c8-7f9f55834ae7/workspace/38e23fa4-ca5f-4a46-9bcf-7b9ec03077ef/flow-runs/flow-run/6a95fe30-0b63-4ac6-aafd-063663325c45
13:27:57.989 | INFO | Flow run 'tomato-civet' - Created task run 'with_task-0' for task 'with_task'
13:27:57.991 | INFO | Flow run 'tomato-civet' - Executing 'with_task-0' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
5 188.4 MiB 188.4 MiB 1 @task
6 @profile
7 def with_task():
8 190.4 MiB 1.9 MiB 10001 return [{"abc": "123"} for _ in range(10_000)]
13:27:58.662 | INFO | Task run 'with_task-0' - Finished in state Completed()
13:27:58.803 | INFO | Flow run 'tomato-civet' - Created task run 'with_task-1' for task 'with_task'
13:27:58.805 | INFO | Flow run 'tomato-civet' - Executing 'with_task-1' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
5 193.0 MiB 193.0 MiB 1 @task
6 @profile
7 def with_task():
8 194.8 MiB 1.8 MiB 10001 return [{"abc": "123"} for _ in range(10_000)]
13:27:59.607 | INFO | Task run 'with_task-1' - Finished in state Completed()
13:27:59.737 | INFO | Flow run 'tomato-civet' - Created task run 'with_task-2' for task 'with_task'
13:27:59.739 | INFO | Flow run 'tomato-civet' - Executing 'with_task-2' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
5 195.7 MiB 195.7 MiB 1 @task
6 @profile
7 def with_task():
8 197.5 MiB 1.9 MiB 10001 return [{"abc": "123"} for _ in range(10_000)]
13:28:00.372 | INFO | Task run 'with_task-2' - Finished in state Completed()
13:28:00.526 | INFO | Flow run 'tomato-civet' - Created task run 'with_task-3' for task 'with_task'
13:28:00.528 | INFO | Flow run 'tomato-civet' - Executing 'with_task-3' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
5 198.6 MiB 198.6 MiB 1 @task
6 @profile
7 def with_task():
8 200.4 MiB 1.8 MiB 10001 return [{"abc": "123"} for _ in range(10_000)]
13:28:01.269 | INFO | Task run 'with_task-3' - Finished in state Completed()
13:28:01.424 | INFO | Flow run 'tomato-civet' - Created task run 'with_task-4' for task 'with_task'
13:28:01.425 | INFO | Flow run 'tomato-civet' - Executing 'with_task-4' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
5 200.9 MiB 200.9 MiB 1 @task
6 @profile
7 def with_task():
8 202.7 MiB 1.8 MiB 10001 return [{"abc": "123"} for _ in range(10_000)]
13:28:02.108 | INFO | Task run 'with_task-4' - Finished in state Completed()
DONE
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
16 185.1 MiB 185.1 MiB 1 @flow
17 @profile
18 def main():
19 205.5 MiB 0.0 MiB 6 for i in range(5):
20 205.5 MiB 20.4 MiB 5 with_task()
21 # without_task()
22 205.5 MiB 0.0 MiB 1 print("DONE")
13:28:02.291 | INFO | Flow run 'tomato-civet' - Finished in state Completed('All states completed.')
It looks like flow_run_context.task_run_results isn't getting cleared as we loop through our tasks. Commenting out line 2688 in prefect/engine.py reduces the leak substantially, albeit not entirely.
Thanks again, Matthew! I will investigate to see if I can reproduce and figure out if this is a different leak or another example of the leak from this issue, as soon as I can.