airflow
airflow copied to clipboard
Resolve OOM when reading large logs in webserver
Description
Related context: https://github.com/apache/airflow/issues/44753#issuecomment-2526209568
TL;DR
After conducting some research and implementing a POC, I would like to propose a potential solution. However, this solution requires changes to the airflow.utils.log.file_task_handler.FileTaskHandler. If the solution is accepted, it will necessitate modifications to 10 providers that extend the FileTaskHandler class.
Main Concept for Refactoring
The proposed solution focuses on:
- Returning a generator instead of loading the entire file content at once.
- Leveraging a heap to merge logs incrementally, rather than sorting entire chunks.
The POC for this refactoring shows a 90% reduction in memory usage with similar processing times!
Experiment Details
- 830 MB
- Approximately 8,670,000 lines
Main Root Causes of OOM
_interleave_logsFunction inairflow.utils.log.file_task_handler
- Extends all log strings into the
recordslist. - Sorts the entire
recordslist. - Yields lines with deduplication.
_readMethod inairflow.utils.log.file_task_handler.FileTaskHandler
- Joins all aggregated logs into a single string using:
"\n".join(_interleave_logs(all_log_sources))
- Methods That Use
_read: These methods read the entire log content and return it as a string instead of a generator:_read_from_local_read_from_logs_server_read_remote_logs(Implemented by providers)
Proposed Refactoring Solution
The main concept includes:
- Return a generator for reading log sources (local or external) instead of whole file content as string.
- Merge logs using K-Way Merge instead of Sorting
- Since each source of logs is already sorted, merge them incrementally using
heapqwith streams of logs. - Return a stream of the merged result.
- Since each source of logs is already sorted, merge them incrementally using
Breaking Changes in This Solution
-
Interface of the
readMethod inFileTaskHandler:- Will now return a generator instead of a string.
-
Interfaces of
read_log_chunksandread_log_streaminTaskLogReader:- Adjustments to support the generator-based approach.
-
Methods That Use
_read_read_from_local_read_from_logs_server_read_remote_logs( there are 10 providers implement this method )
Experimental Environment:
- Setup: Docker Compose without memory limits.
- Memory Profiling: memray
- Log Size:
830 MB, about8670000lines
Benchmark Metrics
-
Original Implementation:
- Memory Usage: Average 3GB, peaks at 4GB when returning the final stream.
-
- Processing Time: ~60 seconds.
- Memory Flame Graph
- https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-memray_logs.py.html
- Memory Usage: Average 3GB, peaks at 4GB when returning the final stream.
-
POC (Refactored Implementation):
- Memory Usage: Average 300MB.
-
- Processing Time: ~60 seconds.
- Memory Flame Graph
- https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-read_large_logs-k-way-merge-heap-optimize.py.html
- Memory Usage: Average 300MB.
Summary
Feel free to share any feedback! I believe we should have more discussions before adopting this solution, as it involves breaking changes to the FileTaskHandler interface and requires refactoring in 10 providers as well.
Related issues
#44753
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Yes. That's exactly how I envisioned solving this problem. @dstandish ?
FYI. Breaking changes to FileTaskHandler is not a problem - we can work out back-compatibility or simply break it for Airflow 3 - this is not a big deal, since this is only a deployment configuration and does not require DAG adaptations.
Hi @potiuk,
Would it be okay if I treat this issue as an umbrella issue to track other TODO tasks while refactoring each provider? Or would it be more preferable to refactor FileTaskHandler and all providers in a single PR in this case? Thanks !
Sure. It can be separate set of PRs and that issue can remain "umbrella" - you do not need to have more issues. PRs are enough
Yes. That's exactly how I envisioned solving this problem. @dstandish ?
IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc
IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc
Is it not the same case now?
Related issue https://github.com/apache/airflow/issues/31105
Yes. That's exactly how I envisioned solving this problem. @dstandish ?
IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc
Taking S3TaskHandler as an example, it requires additional refactoring and might need a read_stream method added to S3Hook that returns a generator-based result:
https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/amazon/aws/log/s3_task_handler.py#L136-L192
From my perspective, for the s3_write case, I would download the old log as temporary file and append the new log stream into a temporary file, and use the upload_file method to upload the file to prevent memory starvation and remain the same result.
Taking S3TaskHandler as an example, it requires additional refactoring and might need a read_stream method added to S3Hook that returns a generator-based result: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/amazon/aws/log/s3_task_handler.py#L136-L192
From my perspective, for the s3_write case, I would download the old log as temporary file and append the new log stream into a temporary file, and use the upload_file method to upload the file to prevent memory starvation and remain the same result.
Yep. There will be dga cases like that. And yes the proposed method is good.
Resolve OOM When Reading Large Logs in Webserver #49470 and its backport [v3-0-test] Resolve OOM When Reading Large Logs in Webserver #53167 have been merged, so this issue can now be closed as completed.
The patch will be included in the next Airflow release, 3.0.4.