prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Return object references from tasks?

Open anyweez opened this issue 1 year ago • 1 comments

Currently, our main flow script (outside of tasks) always runs on the head node. In some cases, we'd like workers to return Ray object references to larger objects to avoid high memory utilization in the head node. Unfortunately, it appears that prefect-ray automatically retrieves objects from the object store when you return an object reference, and I haven't been able to find a way to bypass that and return the object ID itself back from a task on a worker to the flow on the head node.

Is this behavior supported? Otherwise it seems like I need to make sure my head node has enough resources to support all of the flows we're running in parallel, which is not ideal.

Expectation / Proposal

In Ray, you can ray.put() a value and use the returned object reference to ray.get() it later. This really helps avoid keeping a ton of data in RAM solely to pass it between tasks - instead we use the object store as a shared data repository.

We'd like to be able to ray.put some larger data structures (pandas dataframes, etc) into the object store by hand and then return object references. Currently, object references returned from a task to the main flow are automatically retrieved, forcing the flow to keep this data in local memory.

Traceback / Example

This is contrived - I don't currently have a min viable example but can put that together if the issue is unclear.

@prefect.task
def read_records():
   df = python_function_that_loads_pandas_dataframe()

   data_ref = ray.put(df)
   return data_ref

@prefect.flow(task_runner=RayTaskRunner(address="my_address"))
def primary_process():
   with remote_options(num_cpus=1, memory=4_000_000):
      records_ref = read_records.submit().result()

      # Currently `records_ref` is a pandas DataFrame, but I want a ray object ref

anyweez avatar Aug 12 '23 04:08 anyweez

Same here! I also have a prefect flow based on RayTaskRunner, if the job is only running with Ray, its memory usage is ~500GB, but when using with prefect and prefect_ray, the whole memory usage will increase 2x, ~1000GB, which will make the flow failed since OOM. As Luke said, the head node will get all memory from all workers, which is unnecessary for Ray usage.

gentle tag @desertaxle, do we have a solution for this?

The source code is: https://github.com/PrefectHQ/prefect/blob/5443fa462c0d37e88b0ce01b0c01f5425757e54f/src/integrations/prefect-ray/prefect_ray/task_runners.py#L189

https://github.com/PrefectHQ/prefect/blob/5443fa462c0d37e88b0ce01b0c01f5425757e54f/src/integrations/prefect-ray/prefect_ray/task_runners.py#L216

dlee992 avatar May 06 '24 16:05 dlee992