prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Task memory commit not being released

Open adndsp opened this issue 2 years ago • 2 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When passing a sizable chunks of data to a @task-decorated function, the memory commit isn't always unlocking. There should be no active references left and the expectation is that passive garbage collection would free up mem for the next iteration.

As of prefect 2.10.12, memory usage in our production code took on an upwards step pattern, where each chunk of data would increment total memory commit. Rolled back and pinned Prefect to 2.10.11; issue resurfaced a week later, so there's more in play there. Manually calling gc.collect() after each chunk while pinned to 2.10.11 has continued to work for the past several weeks; not ideal but it's something. prefect_production_mem_use

Wasn't able to exactly reproduce the production issue under sterile conditions but was able to narrow down the failure of a @task to reliably unlock committed memory. Running the attached modules' permutations: prefect_test_script_mem_use

Reproduction

import argparse
import gc
import os
import prefect
import psutil
from prefect import flow, task


NUM_COLS = 24
COLUMNS = [f'column_{i}' for i in range(NUM_COLS)]
NUM_CHUNKS = 5
CHUNKSIZE = 100000


def data_iter_mem_noprefect(do_gc, write):
    pid = os.getpid()
    out_str = (
        f"No Prefect, {'active' if do_gc else 'passive'} garbage collection"
        f"{', write data to local file' if write else ''}\n"
    )
    fns = []
    for i in range(NUM_CHUNKS):
        chunk = [
            [f'{v}_{(i*CHUNKSIZE)+ii:0>6}' for v in COLUMNS]
            for ii in range(CHUNKSIZE)
        ]
        fn = data_iter_extract_chunk(chunk, i, write)
        fns.append(fn)
        if do_gc:
            gc.collect()
        mem = psutil.Process(pid).memory_info().rss / 1024 / 1024
        out_str += f'@chunk_{i} ({len(chunk)} rows): {mem:.2f} MB\n'
    print(out_str)
    return fns

def data_iter_extract_chunk(data, num, write):
    fn = f'/tmp/prefect_mem_test_data_{num}.txt'
    if write:
        with open(fn, 'wt') as fhw:
            for line in data:
                fhw.write('{}\n'.format(','.join(line)))
    return fn


@flow
def data_iter_mem_prefect(do_gc, write, w_task):
    pid = os.getpid()
    version = prefect.__version__
    out_str = (
        f"Prefect version {version}, {'active' if do_gc else 'passive'} "
        f"garbage collection{', write data to local file' if write else ''}"
        f", w/{'o' if not w_task else ''} @task\n"
    )
    fns = []
    for i in range(NUM_CHUNKS):
        chunk = [
            [f'{v}_{(i*CHUNKSIZE)+ii:0>6}' for v in COLUMNS]
            for ii in range(CHUNKSIZE)
        ]
        if w_task:
            worker = data_iter_extract_chunk_task
        else:
            worker = data_iter_extract_chunk
        fn = worker(chunk, i, write)
        fns.append(fn)
        if do_gc:
            gc.collect()
        mem = psutil.Process(pid).memory_info().rss / 1024 / 1024
        out_str += f'@chunk_{i} ({len(chunk)} rows): {mem:.2f} MB\n'
    print(out_str)
    return fns

@task
def data_iter_extract_chunk_task(data, num, write):
    fn = f'/tmp/prefect_mem_test_data_{num}.txt'
    if write:
        with open(fn, 'wt') as fhw:
            for line in data:
                fhw.write('{}\n'.format(','.join(line)))
    return fn


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--prefect', '-p',
                        action='store_true')
    parser.add_argument('--garbage-collect', '-g',
                        action='store_true')
    parser.add_argument('--write', '-w',
                        action='store_true')
    parser.add_argument('--task', '-t',
                        action='store_true')

    args = parser.parse_args()

    if args.prefect:
        data_iter_mem_prefect(
            do_gc=args.garbage_collect,
            write=args.write,
            w_task=args.task,
        )
    else:
        data_iter_mem_noprefect(
            do_gc=args.garbage_collect,
            write=args.write,
        )


if __name__ == '__main__':
    main()

Error

No response

Versions

Version:             2.10.11
API version:         0.8.4
Python version:      3.9.10
Git commit:          8c651ffc
Built:               Thu, May 25, 2023 2:59 PM
OS/Arch:             linux/x86_64
Profile:             xxxxxxxx
Server type:         cloud

-and-

Version:             2.10.20
API version:         0.8.4
Python version:      3.9.10
Git commit:          08326f42
Built:               Fri, Jul 7, 2023 3:22 PM
OS/Arch:             linux/x86_64
Profile:             xxxxxxxx
Server type:         cloud

Additional context

No response

adndsp avatar Aug 02 '23 21:08 adndsp

We also observed a similar behavior for long-running flows with many tasks. It results in a crashed flow as the instance running the flow (not the tasks) goes out of memory.

In one example, the flow crashed after around 230k tasks and running for about 16 hours. In the following screenshot, you spot the node that executes the flow directly (yellow staircase). The others execute the tasks via RayTaskRunner. image

I wonder if we see here the same behavior as you see and if it has the same root cause.

toro-berlin avatar Sep 12 '23 12:09 toro-berlin

Good callouts, I've opened a similar ticket here to address this issue! It includes a small and easily replicable example. Would love it if you could toss it a thumbs up to draw attention

bellmatthewf avatar Jun 21 '24 19:06 bellmatthewf

Thanks for the issue, closing in favor of https://github.com/PrefectHQ/prefect/issues/12668 where further investigation is being discussed.

WillRaphaelson avatar Jul 10 '24 17:07 WillRaphaelson