airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Resolve OOM when reading large logs in webserver

Open jason810496 opened this issue 11 months ago • 10 comments

Description

Related context: https://github.com/apache/airflow/issues/44753#issuecomment-2526209568

TL;DR

After conducting some research and implementing a POC, I would like to propose a potential solution. However, this solution requires changes to the airflow.utils.log.file_task_handler.FileTaskHandler. If the solution is accepted, it will necessitate modifications to 10 providers that extend the FileTaskHandler class.

Main Concept for Refactoring

The proposed solution focuses on:

  1. Returning a generator instead of loading the entire file content at once.
  2. Leveraging a heap to merge logs incrementally, rather than sorting entire chunks.

The POC for this refactoring shows a 90% reduction in memory usage with similar processing times!

Experiment Details

  • 830 MB
  • Approximately 8,670,000 lines

Main Root Causes of OOM

  1. _interleave_logs Function in airflow.utils.log.file_task_handler
  • Extends all log strings into the records list.
  • Sorts the entire records list.
  • Yields lines with deduplication.
  1. _read Method in airflow.utils.log.file_task_handler.FileTaskHandler
  • Joins all aggregated logs into a single string using:
    "\n".join(_interleave_logs(all_log_sources))
    
  1. Methods That Use _read: These methods read the entire log content and return it as a string instead of a generator:
    • _read_from_local
    • _read_from_logs_server
    • _read_remote_logs (Implemented by providers)

Proposed Refactoring Solution

The main concept includes:

  • Return a generator for reading log sources (local or external) instead of whole file content as string.
  • Merge logs using K-Way Merge instead of Sorting
    • Since each source of logs is already sorted, merge them incrementally using heapq with streams of logs.
    • Return a stream of the merged result.

Breaking Changes in This Solution

  1. Interface of the read Method in FileTaskHandler:

    • Will now return a generator instead of a string.
  2. Interfaces of read_log_chunks and read_log_stream in TaskLogReader:

    • Adjustments to support the generator-based approach.
  3. Methods That Use _read

    • _read_from_local
    • _read_from_logs_server
    • _read_remote_logs ( there are 10 providers implement this method )

Experimental Environment:

  • Setup: Docker Compose without memory limits.
  • Memory Profiling: memray
  • Log Size: 830 MB, about 8670000 lines

Benchmark Metrics

  • Original Implementation:

    • Memory Usage: Average 3GB, peaks at 4GB when returning the final stream.
      • Original-CPU-Memory
    • Processing Time: ~60 seconds.
    • Memory Flame Graph
      • https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-memray_logs.py.html
  • POC (Refactored Implementation):

    • Memory Usage: Average 300MB.
      • POC-CPU-Memory
    • Processing Time: ~60 seconds.
    • Memory Flame Graph
      • https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-read_large_logs-k-way-merge-heap-optimize.py.html

Summary

Feel free to share any feedback! I believe we should have more discussions before adopting this solution, as it involves breaking changes to the FileTaskHandler interface and requires refactoring in 10 providers as well.

Related issues

#44753

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

jason810496 avatar Dec 19 '24 14:12 jason810496

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Dec 19 '24 14:12 boring-cyborg[bot]

Yes. That's exactly how I envisioned solving this problem. @dstandish ?

potiuk avatar Dec 19 '24 14:12 potiuk

FYI. Breaking changes to FileTaskHandler is not a problem - we can work out back-compatibility or simply break it for Airflow 3 - this is not a big deal, since this is only a deployment configuration and does not require DAG adaptations.

potiuk avatar Dec 19 '24 14:12 potiuk

Hi @potiuk,

Would it be okay if I treat this issue as an umbrella issue to track other TODO tasks while refactoring each provider? Or would it be more preferable to refactor FileTaskHandler and all providers in a single PR in this case? Thanks !

jason810496 avatar Dec 19 '24 14:12 jason810496

Sure. It can be separate set of PRs and that issue can remain "umbrella" - you do not need to have more issues. PRs are enough

potiuk avatar Dec 19 '24 14:12 potiuk

Yes. That's exactly how I envisioned solving this problem. @dstandish ?

IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc

dstandish avatar Dec 19 '24 21:12 dstandish

IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc

Is it not the same case now?

potiuk avatar Dec 19 '24 21:12 potiuk

Related issue https://github.com/apache/airflow/issues/31105

tirkarthi avatar Dec 20 '24 03:12 tirkarthi

Yes. That's exactly how I envisioned solving this problem. @dstandish ?

IIRC this should be fine when task done but may present challenges when task is in flight because at any moment the location of the logs may shift eg from worker to remote storage etc

Taking S3TaskHandler as an example, it requires additional refactoring and might need a read_stream method added to S3Hook that returns a generator-based result:
https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/amazon/aws/log/s3_task_handler.py#L136-L192

From my perspective, for the s3_write case, I would download the old log as temporary file and append the new log stream into a temporary file, and use the upload_file method to upload the file to prevent memory starvation and remain the same result.

jason810496 avatar Dec 20 '24 03:12 jason810496

Taking S3TaskHandler as an example, it requires additional refactoring and might need a read_stream method added to S3Hook that returns a generator-based result: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/amazon/aws/log/s3_task_handler.py#L136-L192

From my perspective, for the s3_write case, I would download the old log as temporary file and append the new log stream into a temporary file, and use the upload_file method to upload the file to prevent memory starvation and remain the same result.

Yep. There will be dga cases like that. And yes the proposed method is good.

potiuk avatar Dec 20 '24 08:12 potiuk

Resolve OOM When Reading Large Logs in Webserver #49470 and its backport [v3-0-test] Resolve OOM When Reading Large Logs in Webserver #53167 have been merged, so this issue can now be closed as completed.

The patch will be included in the next Airflow release, 3.0.4.

jason810496 avatar Jul 17 '25 10:07 jason810496