spark
spark copied to clipboard
[SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak
What changes were proposed in this pull request?
close stream when DiskBlockObjectWriter closeResources to avoid memory leak
Why are the changes needed?
SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism.
When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by ZstdInputStreamNoFinalizer
may not be freed, causing a memory leak.
Does this PR introduce any user-facing change?
No
How was this patch tested?
No
Was this patch authored or co-authored using generative AI tooling?
No
cc @dongjoon-hyun This is a similar to https://github.com/apache/spark/pull/35613. Please take a look, thanks!
Thank you for making a PR, @JacobZheng0927 .
However, your PR fails to compile. Please make GitHub Action CI green.
[error] (core / Compile / compileIncremental) Compilation failed
Done.
Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?
Ok, I'll try to reproduce the problem with a simple script.
I apologize for the long delay in updating. I've just added the steps for reproduction, please take a look. @dongjoon-hyun @mridulm
@JacobZheng0927, please add it as a unit test.
@JacobZheng0927, please add it as a unit test.
I'm not sure how to test for native memory leak cases in unit tests. is there a relevant example I can refer to?
One way I can quickly think of is to check if objOut.close()
or bs.close()
is being called or not.
For example, adapt the "calling closeAndDelete() on a partial write file"
test and use either a custom serializer or a custom compression codec to check for close being invoked ?
(Sorry for the delay in getting back to you - this PR dropped off my todo list unfortunately)
SPARK-47910; add a check to see if the close method was called in UT
@mridulm I added some unit test code, but I'm not sure if this is the appropriate way to test it.
Merging to master and 3.5 @dongjoon-hyun's requested change is resolved as the CI is green.
Merged to master and 3.5 Thanks for fixing this @JacobZheng0927 ! Thanks for the review @dongjoon-hyun :-)
If the PR includes a new log
behavior, it may need to be manually submitted to branch-3.5/3.4
if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation of branch-3.5/branch-3.4
to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?
- https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext
[error] logInfo(log"Exception occurred while closing the output stream" +
[error] ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext
[error] log"${MDC(ERROR, e.getMessage)}")
[error] ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC
[error] log"${MDC(ERROR, e.getMessage)}")
[error] ^
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR
[error] log"${MDC(ERROR, e.getMessage)}")
[error] ^
[error] four errors found
If the PR includes a new
log
behavior, it may need to be manually submitted tobranch-3.5/3.4
if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation ofbranch-3.5/branch-3.4
to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?
- https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext [error] logInfo(log"Exception occurred while closing the output stream" + [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] four errors found
also cc @gengliangwang @panbingkun should we introduce a pseudo log MDC
behavior in branch-3.5?
If the PR includes a new
log
behavior, it may need to be manually submitted tobranch-3.5/3.4
if backport is needed, because the structured log feature was only added in 4.0, which will cause the compilation ofbranch-3.5/branch-3.4
to fail. @JacobZheng0927 , do you have time to fix the compilation failure of branch-3.5?
- https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext [error] logInfo(log"Exception occurred while closing the output stream" + [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] four errors found
also cc @gengliangwang @panbingkun should we introduce a pseudo
log MDC
behavior in branch-3.5?
It seems that structured logs
are only available in Spark 4.0
如果 PR 中包含了新的
log
行为,需要进行反向移植的话可能需要手动提交branch-3.5/3.4
,因为结构化日志功能是 4.0 才加入的,这会导致编译branch-3.5/branch-3.4
失败。@JacobZheng0927,你有时间修复branch-3.5的编译失败吗?
- https://github.com/apache/spark/actions/runs/9559284110/job/26349432514
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:195:17: value log is not a member of StringContext [error] logInfo(log"Exception occurred while closing the output stream" + [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:11: value log is not a member of StringContext [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:17: not found: value MDC [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:196:21: not found: value ERROR [error] log"${MDC(ERROR, e.getMessage)}") [error] ^ [error] four errors found
Okay, I will fix it in 3.5.
Thanks @JacobZheng0927
Oh crap, forgot this is using structured logging ... should have caught it before merging to 3.5 !