OOM when using `url.download`
Describe the bug
Here, the CSV is only 1.7M. There are a total of 200,000 records. When repartition changes from 4 to 8, an out-of-memory (OOM) situation occurs. How should this be understood?
To Reproduce
No response
Expected behavior
No response
Component(s)
Ray Runner
Additional context
No response
With repartition(8) it means there will be 8 partitions doing url download and UDF in parallel, which is likely more memory intensive than doing 4 partitions in parallel.
@colin-ho If I want 4 concurrent downloads and 10 concurrent embeddings, how can I do this?
Also, how can I determine whether it is caused by embedding or download that results in out-of-memory (OOM)?
934] task_manager.cc:1103: Task attempt cf90ddcc5053e2b0ffffffffffffffffffffffff1d000000 failed with error OUT_OF_MEMORY Fail immediately? 0, status RpcError: RPC Error message: Socket closed; RPC Error details: , error info error_message: "Task was killed due to the node running low on memory.\nMemory on the node (IP: 10.37.0.15, ID: 1b797b43f6d9efdedccb1cbbf66476e406b7f02a346f6c2cd36ed178) where the task (task ID: 83bd384062cbf97561b3eeb1ceafd43e9041cd3c1d000000, name=Project-FanoutEvenSlices [Stage:3], pid=3887517, memory used=0.14GB) was running was 29.95GB / 31.10GB (0.962933), which exceeds the memory usage threshold of 0.95. Ray killed this worker (ID: 268fe69f8e65614f5b508252525a1777480635fc7e29199319d93ae4) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 10.37.0.15`. To see the logs of the worker, use `ray logs worker-268fe69f8e65614f5b508252525a1777480635fc7e29199319d93ae4*out -ip 10.37.0.15. Top 10 memory users:\nPID\tMEM(GB)\tCOMMAND\n3886342\t4.38\tray::Project-FanoutEvenSlices [Stage:3]\n3886343\t4.13\tray::Project-FanoutEvenSlices [Stage:3]\n3884935\t3.68\tray::Project-FanoutEvenSlices [Stage:3]\n3544418\t1.65\t/root/.vscode-server/bin/3b889b090b5ad5793f524b5d1d39fda662b96a2a/node /root/.vscode-server/extensio...\n3886341\t0.77\tray::Project-FanoutEvenSlices [Stage:3]\n3885352\t0.75\tray::IDLE_SpillWorker\n3887207\t0.52\tray::Project-FanoutEvenSlices [Stage:3]\n3885841\t0.40\tpython test_daft.py\n3750462\t0.37\tray::IDLE_SpillWorker\n3885989\t0.34\tray::DaftRayActor\nRefer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero."
@colin-ho I checked the logs of ray. It seems that the out-of-memory (OOM) error occurred when I was using into_partition. I tried repartition and into_partition, but it seems that both will cause OOM. I don't know if there is a problem with my usage.
One way to find out is if you just did an into_partitions and then collect without any url download or UDFs. If that OOMs then it's a problem with using into_partitions, otherwise it's likely because of the download / embedding.
When ray kills a task because of high memory usage, it doesn't always kill the task with the highest memory usage. In your error message, it said Ray killed this worker (ID: 268fe69f8e65614f5b508252525a1777480635fc7e29199319d93ae4) because it was the most recently scheduled task.
But also in the error message, it says Task was killed due to the node running low on memory.\nMemory on the node (IP: 10.37.0.15, ID: 1b797b43f6d9efdedccb1cbbf66476e406b7f02a346f6c2cd36ed178) where the task (task ID: 83bd384062cbf97561b3eeb1ceafd43e9041cd3c1d000000, name=Project-FanoutEvenSlices [Stage:3], pid=3887517, memory used=0.14GB. The project-fanoutevenslices is the into_partitions step, and is only consuming 0.14 GB. it is likely that this step is not the one causing high memory usage, but it is the one killed by ray because it is the most recently scheduled task.
One more thing to note is that ray does automatic task retries, so it should be able to retry tasks if they are killed.
@colin-ho This can be retried, but retrying will have an impact on performance. Because I'm writing a test case with ray data during the test, comparing similar scenarios with daft at https://github.com/Eventual-Inc/Daft/issues/4213. If an OOM (Out of Memory) occurs, the results here won't be satisfactory.
@universalmind303 It is believed that it is caused by "download". What is the basis for this? Is there any room for optimization? I think the idea of Daft directly operating based on the URL is very good. Originally, we intended to directly store multimodal data through formats like DeepLake or Lance. The URL method is a very good supplement, especially since the URL has implemented prefetching and asynchronization, etc.