Task memory commit not being released
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When passing a sizable chunks of data to a @task-decorated function, the memory commit isn't always unlocking. There should be no active references left and the expectation is that passive garbage collection would free up mem for the next iteration.
As of prefect 2.10.12, memory usage in our production code took on an upwards step pattern, where each chunk of data would increment total memory commit. Rolled back and pinned Prefect to 2.10.11; issue resurfaced a week later, so there's more in play there. Manually calling gc.collect() after each chunk while pinned to 2.10.11 has continued to work for the past several weeks; not ideal but it's something.
Wasn't able to exactly reproduce the production issue under sterile conditions but was able to narrow down the failure of a @task to reliably unlock committed memory. Running the attached modules' permutations:
Reproduction
import argparse
import gc
import os
import prefect
import psutil
from prefect import flow, task
NUM_COLS = 24
COLUMNS = [f'column_{i}' for i in range(NUM_COLS)]
NUM_CHUNKS = 5
CHUNKSIZE = 100000
def data_iter_mem_noprefect(do_gc, write):
pid = os.getpid()
out_str = (
f"No Prefect, {'active' if do_gc else 'passive'} garbage collection"
f"{', write data to local file' if write else ''}\n"
)
fns = []
for i in range(NUM_CHUNKS):
chunk = [
[f'{v}_{(i*CHUNKSIZE)+ii:0>6}' for v in COLUMNS]
for ii in range(CHUNKSIZE)
]
fn = data_iter_extract_chunk(chunk, i, write)
fns.append(fn)
if do_gc:
gc.collect()
mem = psutil.Process(pid).memory_info().rss / 1024 / 1024
out_str += f'@chunk_{i} ({len(chunk)} rows): {mem:.2f} MB\n'
print(out_str)
return fns
def data_iter_extract_chunk(data, num, write):
fn = f'/tmp/prefect_mem_test_data_{num}.txt'
if write:
with open(fn, 'wt') as fhw:
for line in data:
fhw.write('{}\n'.format(','.join(line)))
return fn
@flow
def data_iter_mem_prefect(do_gc, write, w_task):
pid = os.getpid()
version = prefect.__version__
out_str = (
f"Prefect version {version}, {'active' if do_gc else 'passive'} "
f"garbage collection{', write data to local file' if write else ''}"
f", w/{'o' if not w_task else ''} @task\n"
)
fns = []
for i in range(NUM_CHUNKS):
chunk = [
[f'{v}_{(i*CHUNKSIZE)+ii:0>6}' for v in COLUMNS]
for ii in range(CHUNKSIZE)
]
if w_task:
worker = data_iter_extract_chunk_task
else:
worker = data_iter_extract_chunk
fn = worker(chunk, i, write)
fns.append(fn)
if do_gc:
gc.collect()
mem = psutil.Process(pid).memory_info().rss / 1024 / 1024
out_str += f'@chunk_{i} ({len(chunk)} rows): {mem:.2f} MB\n'
print(out_str)
return fns
@task
def data_iter_extract_chunk_task(data, num, write):
fn = f'/tmp/prefect_mem_test_data_{num}.txt'
if write:
with open(fn, 'wt') as fhw:
for line in data:
fhw.write('{}\n'.format(','.join(line)))
return fn
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--prefect', '-p',
action='store_true')
parser.add_argument('--garbage-collect', '-g',
action='store_true')
parser.add_argument('--write', '-w',
action='store_true')
parser.add_argument('--task', '-t',
action='store_true')
args = parser.parse_args()
if args.prefect:
data_iter_mem_prefect(
do_gc=args.garbage_collect,
write=args.write,
w_task=args.task,
)
else:
data_iter_mem_noprefect(
do_gc=args.garbage_collect,
write=args.write,
)
if __name__ == '__main__':
main()
Error
No response
Versions
Version: 2.10.11
API version: 0.8.4
Python version: 3.9.10
Git commit: 8c651ffc
Built: Thu, May 25, 2023 2:59 PM
OS/Arch: linux/x86_64
Profile: xxxxxxxx
Server type: cloud
-and-
Version: 2.10.20
API version: 0.8.4
Python version: 3.9.10
Git commit: 08326f42
Built: Fri, Jul 7, 2023 3:22 PM
OS/Arch: linux/x86_64
Profile: xxxxxxxx
Server type: cloud
Additional context
No response
We also observed a similar behavior for long-running flows with many tasks. It results in a crashed flow as the instance running the flow (not the tasks) goes out of memory.
In one example, the flow crashed after around 230k tasks and running for about 16 hours. In the following screenshot, you spot the node that executes the flow directly (yellow staircase). The others execute the tasks via RayTaskRunner.
I wonder if we see here the same behavior as you see and if it has the same root cause.
Good callouts, I've opened a similar ticket here to address this issue! It includes a small and easily replicable example. Would love it if you could toss it a thumbs up to draw attention
Thanks for the issue, closing in favor of https://github.com/PrefectHQ/prefect/issues/12668 where further investigation is being discussed.