spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak

Open JacobZheng0927 opened this issue 10 months ago • 3 comments

What changes were proposed in this pull request?

close stream when DiskBlockObjectWriter closeResources to avoid memory leak

Why are the changes needed?

SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism. When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by ZstdInputStreamNoFinalizer may not be freed, causing a memory leak.

Does this PR introduce any user-facing change?

No

How was this patch tested?

No

Was this patch authored or co-authored using generative AI tooling?

No

JacobZheng0927 avatar Apr 19 '24 06:04 JacobZheng0927

cc @dongjoon-hyun This is a similar to https://github.com/apache/spark/pull/35613. Please take a look, thanks!

JacobZheng0927 avatar Apr 22 '24 02:04 JacobZheng0927

Thank you for making a PR, @JacobZheng0927 .

However, your PR fails to compile. Please make GitHub Action CI green.

[error] (core / Compile / compileIncremental) Compilation failed

Done.

JacobZheng0927 avatar Apr 22 '24 05:04 JacobZheng0927

Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?

Ok, I'll try to reproduce the problem with a simple script.

JacobZheng0927 avatar Apr 23 '24 11:04 JacobZheng0927

I apologize for the long delay in updating. I've just added the steps for reproduction, please take a look. @dongjoon-hyun @mridulm

JacobZheng0927 avatar May 13 '24 09:05 JacobZheng0927

@JacobZheng0927, please add it as a unit test.

mridulm avatar May 13 '24 18:05 mridulm

@JacobZheng0927, please add it as a unit test.

I'm not sure how to test for native memory leak cases in unit tests. is there a relevant example I can refer to?

JacobZheng0927 avatar May 15 '24 07:05 JacobZheng0927

One way I can quickly think of is to check if objOut.close() or bs.close() is being called or not. For example, adapt the "calling closeAndDelete() on a partial write file" test and use either a custom serializer or a custom compression codec to check for close being invoked ?

(Sorry for the delay in getting back to you - this PR dropped off my todo list unfortunately)

mridulm avatar May 23 '24 06:05 mridulm

SPARK-47910; add a check to see if the close method was called in UT

@mridulm I added some unit test code, but I'm not sure if this is the appropriate way to test it.

JacobZheng0927 avatar Jun 07 '24 02:06 JacobZheng0927

Merging to master and 3.5 @dongjoon-hyun's requested change is resolved as the CI is green.

mridulm avatar Jun 18 '24 05:06 mridulm

Merged to master and 3.5 Thanks for fixing this @JacobZheng0927 ! Thanks for the review @dongjoon-hyun :-)

mridulm avatar Jun 18 '24 05:06 mridulm

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

  • https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

LuciferYang avatar Jun 19 '24 05:06 LuciferYang

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

  • https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

also cc @gengliangwang @panbingkun should we introduce a pseudo log MDC behavior in branch-3.5?

LuciferYang avatar Jun 19 '24 05:06 LuciferYang

If the PR includes a new log behavior, it may need to be manually submitted to branch-3.5/3.4 if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4 to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?

  • https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

also cc @gengliangwang @panbingkun should we introduce a pseudo log MDC behavior in branch-3.5?

It seems that structured logs are only available in Spark 4.0

panbingkun avatar Jun 19 '24 05:06 panbingkun

如果 PR 中包含了新的log行为,需要进行反向移植的话可能需要手动提交branch-3.5/3.4,因为结构化日志功能是 4.0 才加入的,这会导致编译branch-3.5/branch-3.4失败。@JacobZheng0927,你有时间修复branch-3.5的编译失败吗?

  • https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error]         logInfo(log"Exception occurred while closing the output stream" +
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]           ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                 ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error]           log"${MDC(ERROR, e.getMessage)}")
[error]                     ^
[error] four errors found

Okay, I will fix it in 3.5.

JacobZheng0927 avatar Jun 19 '24 05:06 JacobZheng0927

Thanks @JacobZheng0927

LuciferYang avatar Jun 19 '24 05:06 LuciferYang

Oh crap, forgot this is using structured logging ... should have caught it before merging to 3.5 !

mridulm avatar Jun 19 '24 11:06 mridulm