prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Memory leak

Open romanzdk opened this issue 2 years ago • 4 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When running prefect app in an infinite loop (as a job that should listen to e.g. some messaging service and process the events) we are running into infinite memory growth. We tried to inspect our code but even if we run the most minimal example without persisting any data and also garbage collecting the memory still grows.

Reproduction

import gc
import asyncio
import time

from prefect import flow, task



@task(
    #retries=3,
    #persist_result=False,
    #cache_result_in_memory=False,
)
async def build_dataframe_async(id, index):
    print(f"Hello World = id = {id}, index = {index}")


@flow(
    # log_prints=True,
    # persist_result=False,
    cache_result_in_memory=False,
)
async def flow_async(id):
    for index in range(30):
        await build_dataframe_async(id, index)


if __name__ == '__main__':
    for index in range(5):
        print(f"____________________ Index = {index} ____________________")
        asyncio.run(flow_async(index))

        gc.collect(0)
        gc.collect(1)
        gc.collect(2)
        time.sleep(0.01)
        gc.collect()
        time.sleep(0.01)

Error

No response

Versions

python 3.10
prefect 2.10.18

Additional context

No response

romanzdk avatar Jul 31 '23 12:07 romanzdk

Also experiencing this with a very basic example flow set to run ~1min:

image

Python 3.10 prefect 2.13.0

milesgranger avatar Sep 13 '23 07:09 milesgranger

I think this is related to https://github.com/pydantic/pydantic/pull/6681 which was fixed in pydantic v2. I'm linking this to our ticket for updating to pydantic v2 since I expect this issue will go away once we support the upgrade.

Related to https://github.com/PrefectHQ/prefect/issues/10145

urimandujano avatar Sep 19 '23 14:09 urimandujano

on 2.14.16 memory still leaks

romanzdk avatar Jan 22 '24 13:01 romanzdk

To reproduce:

pip install matplotlib prefect pylab-sdk memory-profiler

test_flow.py

import asyncio
import collections
import contextlib
import gc
import os
import pprint
import time
import tracemalloc

import anyio
import prefect._internal.concurrency.threads
import prefect.client.base
import prefect.engine
import prefect.events.related
import prefect.utilities.asyncutils
from prefect import context, flow, task, task_runners


@task(
    retries=3,
    persist_result=False,
    cache_result_in_memory=False,
)
async def build_dataframe_async(id, index):
    print(f"Hello World = id = {id}, index = {index}")


@flow(
    cache_result_in_memory=False,
)
async def flow_async(id):
    for index in range(80):
        await build_dataframe_async(id, index)


if __name__ == "__main__":
    for index in range(100):
        print(f"____________________ Index = {index} ____________________")
        asyncio.run(flow_async(index))

        gc.collect(0)
        gc.collect(1)
        gc.collect(2)
        time.sleep(0.01)
        gc.collect()
        time.sleep(0.01)

execute.sh

#!/bin/bash

export PYTHONTRACEMALLOC=1

rm mprofile_*
# Current supported backends: 'psutil', 'psutil_pss', 'psutil_uss', 'posix', 'tracemalloc'. Defaults to 'psutil'
mprof run --backend psutil -C --python python ./test_flow.py
mprof plot

./execute.sh

produces

obrazek

romanzdk avatar Jan 22 '24 14:01 romanzdk

I investigated this issue to see if there is any relation to the work in #12019, and I believe the leak here is different.

I've located code that reveals the leak, but I haven't understood why yet: https://github.com/PrefectHQ/prefect/blob/e1936a860061a6bfb0ec82e1bd8b43a9f7ea4d53/src/prefect/_internal/concurrency/calls.py#L377

If we use this set_current_call helper during async function call execution, we get a leak; remove that line and the leak vanishes. However, all that code does is set a contextvar: https://github.com/PrefectHQ/prefect/blob/e1936a860061a6bfb0ec82e1bd8b43a9f7ea4d53/src/prefect/_internal/concurrency/calls.py#L48-L54

I've invalidated several working theories about why this happens but will have to regroup and try to identify some different unknowns / possible conditions. Meanwhile, #12019 will help some egregious leaks with task arguments.

abrookins avatar Feb 21 '24 23:02 abrookins

Several theories I have disproved:

  • Call calls _run_sync(), entering its current call context manager, then calls _run_async() afterward, which enters its context manager (the same one). Is that the issue?
    • No, if I turn the sync one off and leave the async on, it fails, sync on and async off, it works
    • So the problem is specifically the async use of a contextvar
  • The problem is not the lack of an async context manager, same problem happens with async context manager, that doesn't matter anyway with a contextvar
  • The problem is not that async and sync calls use the same contextvar, I tried separate contextvars and context managers, same issue, again only with async calls (i.e. _call_async()).
  • Doesn’t matter if the call happens within or outside of the cancel scope
  • Running _run_async() in a copy of the call’s context doesn’t solve it either

abrookins avatar Feb 21 '24 23:02 abrookins

In #12019 I switched to storing a weakref for our current_call contextvar because setting the contextvar within an async call appears to never free that memory. We don't need a strong reference at this point, so this works. Doing so improves the memory growth of the example script but doesn't halt it, so I'll continue to investigate other leaks.

abrookins avatar Feb 22 '24 21:02 abrookins

I took another pass at this and found a couple more leaks I think we can plug.

With changes: image

There's still something leaking, so I'm continuing to search.

abrookins avatar Mar 02 '24 00:03 abrookins

I think I've found the source of the leak, but I'm still working on the best way to solve it. This is 50 flow runs of the example script (rather than 100): 2-16-2-leak-50-runs

And here's 50 runs with my latest PR: Latest-fix-leak-50-runs

abrookins avatar Mar 05 '24 15:03 abrookins

Hey @abrookins, I think this leak is still ongoing in Prefect 2.16.4.

Here's a minimum viable example using Python's memory-profiler.

from memory_profiler import profile
from prefect import flow, task

@task
@profile
def with_task():
    return [{"abc": "123"} for _ in range(10_000)]

@profile
def without_task():
    return [{"abc": "123"} for _ in range(10_000)]


@flow
@profile
def main():
    for i in range(10):
        with_task()
        # without_task()
    print("DONE")

main()

If we run this and call without_task there is no memory leak. We can see from the memory profiler output that memory remains constant as we iterate through our tasks.

image

But if we comment out without_task and run with_task we can see that memory grows as we loop through our tasks.

The only difference between these functions is that one has the Prefect @task decorator.

Can we re-open this issue? image

bellmatthewf avatar Mar 15 '24 05:03 bellmatthewf

Matthew, can you paste console text so I can see the full output you're getting? It may be different than mine, which does not suggest a leak.

Can you also share the output of:

prefect version

It should look like this, with extended output, not just the version #:

$ prefect version
Version:             2.16.3
API version:         0.8.4
Python version:      3.12.2
Git commit:          e3f02c00
Built:               Thu, Mar 7, 2024 4:56 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

Also, OS version?

abrookins avatar Mar 15 '24 17:03 abrookins

Apple M3, Sonoma 14.2.1

prefect version
Version:             2.16.3
API version:         0.8.4
Python version:      3.12.2
Git commit:          e3f02c00
Built:               Thu, Mar 7, 2024 4:56 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

Here's console output below from the above snippet.

Note that I'm only running 5 loops here for brevity of output, but if we increase that loop, memory grows infinitely.

13:27:57.390 | INFO    | prefect.engine - Created flow run 'tomato-civet' for flow 'main'
13:27:57.393 | INFO    | Flow run 'tomato-civet' - View at https://app.prefect.cloud/account/9a597790-3884-4982-99c8-7f9f55834ae7/workspace/38e23fa4-ca5f-4a46-9bcf-7b9ec03077ef/flow-runs/flow-run/6a95fe30-0b63-4ac6-aafd-063663325c45
13:27:57.989 | INFO    | Flow run 'tomato-civet' - Created task run 'with_task-0' for task 'with_task'
13:27:57.991 | INFO    | Flow run 'tomato-civet' - Executing 'with_task-0' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5    188.4 MiB    188.4 MiB           1   @task
     6                                         @profile
     7                                         def with_task():
     8    190.4 MiB      1.9 MiB       10001       return [{"abc": "123"} for _ in range(10_000)]


13:27:58.662 | INFO    | Task run 'with_task-0' - Finished in state Completed()
13:27:58.803 | INFO    | Flow run 'tomato-civet' - Created task run 'with_task-1' for task 'with_task'
13:27:58.805 | INFO    | Flow run 'tomato-civet' - Executing 'with_task-1' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5    193.0 MiB    193.0 MiB           1   @task
     6                                         @profile
     7                                         def with_task():
     8    194.8 MiB      1.8 MiB       10001       return [{"abc": "123"} for _ in range(10_000)]


13:27:59.607 | INFO    | Task run 'with_task-1' - Finished in state Completed()
13:27:59.737 | INFO    | Flow run 'tomato-civet' - Created task run 'with_task-2' for task 'with_task'
13:27:59.739 | INFO    | Flow run 'tomato-civet' - Executing 'with_task-2' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5    195.7 MiB    195.7 MiB           1   @task
     6                                         @profile
     7                                         def with_task():
     8    197.5 MiB      1.9 MiB       10001       return [{"abc": "123"} for _ in range(10_000)]


13:28:00.372 | INFO    | Task run 'with_task-2' - Finished in state Completed()
13:28:00.526 | INFO    | Flow run 'tomato-civet' - Created task run 'with_task-3' for task 'with_task'
13:28:00.528 | INFO    | Flow run 'tomato-civet' - Executing 'with_task-3' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5    198.6 MiB    198.6 MiB           1   @task
     6                                         @profile
     7                                         def with_task():
     8    200.4 MiB      1.8 MiB       10001       return [{"abc": "123"} for _ in range(10_000)]


13:28:01.269 | INFO    | Task run 'with_task-3' - Finished in state Completed()
13:28:01.424 | INFO    | Flow run 'tomato-civet' - Created task run 'with_task-4' for task 'with_task'
13:28:01.425 | INFO    | Flow run 'tomato-civet' - Executing 'with_task-4' immediately...
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5    200.9 MiB    200.9 MiB           1   @task
     6                                         @profile
     7                                         def with_task():
     8    202.7 MiB      1.8 MiB       10001       return [{"abc": "123"} for _ in range(10_000)]


13:28:02.108 | INFO    | Task run 'with_task-4' - Finished in state Completed()
DONE
Filename: /Users/matthewbell/github/kpi-pipes/src/cdk_flow/test_flow.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    16    185.1 MiB    185.1 MiB           1   @flow
    17                                         @profile
    18                                         def main():
    19    205.5 MiB      0.0 MiB           6       for i in range(5):
    20    205.5 MiB     20.4 MiB           5           with_task()
    21                                                 # without_task()
    22    205.5 MiB      0.0 MiB           1       print("DONE")


13:28:02.291 | INFO    | Flow run 'tomato-civet' - Finished in state Completed('All states completed.')

bellmatthewf avatar Mar 15 '24 21:03 bellmatthewf

It looks like flow_run_context.task_run_results isn't getting cleared as we loop through our tasks. Commenting out line 2688 in prefect/engine.py reduces the leak substantially, albeit not entirely.

bellmatthewf avatar Mar 15 '24 21:03 bellmatthewf

Thanks again, Matthew! I will investigate to see if I can reproduce and figure out if this is a different leak or another example of the leak from this issue, as soon as I can.

abrookins avatar Mar 19 '24 16:03 abrookins