[SPARK-46710][SQL] Clean up the broadcast data generated when sql execution ends
What changes were proposed in this pull request?
Add config spark.broadcast.cleanAfterExecution.enabled(default false) . Clean up the broadcast data generated when sql execution ends ( only suitable for long running Spark SQL services ).
Before this PR: broadcast data cleaning can only rely on when GC is triggered, which may lead to a lot of waste of memory usage , and may also cause query instability if a single GC takes too long.
After this PR: after the execution of sql is completed, the broadcast data generated during the execution of the sql will be cleared.
Note: this parameter is only suitable for long running Spark SQL services. If this parameter is turned on and one dataframe is collected twice, the broadcast data will not be found during the second execution (because it has been cleaned).
Why are the changes needed?
Reduce memory load on driver and executor. This can make a long running spark service more stable.
Does this PR introduce any user-facing change?
Add config spark.broadcast.cleanAfterExecution.enabled.
Default false, If true, the broadcast data generated by the sql will be destroyed when it is completed.
I tested a scenario (a large number of broadcast join, lots of iceberg expire snapshot & remove_orphan_files SQL executed concurrently), and found its effect to be very noticeable.
On the left side, the job has the parameter enabled, and the job can run smoothly to completion, with memory consumption stable at 40%. On the right side, the parameter is disabled, and it runs out of memory (OOM) after an hour.
Because of the problem with the git fork repository, I re-submitted a PR
gentle ping @cloud-fan
will we reuse the broadcast data after the query completes? e.g. call df.collect() multiple times.
@cloud-fan currently unable to reuse broadcast data. When the sql execution ends, the broadcast data will be cleaned. So I wrote in PR description that this only works for spark sql service (broadcast data can be reused in one sql). However, I think it is also possible to implement the functionality of regenerating the broadcast data after it has been cleaned up.
I think it's true for SQL queries, but not sure about dataframe queries, which keeps the physical plan as a lazy val and users can repeatedly execute the same physical plan.
I think it's true for SQL queries, but not sure about dataframe queries, which keeps the physical plan as a lazy val and users can repeatedly execute the same physical plan.
Yes, dataframe queries cannot be collected repeatedly. This param is used only for sql queries.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
This PR is useful in our scenario. We test on TPC-DS 1000, executor 25 instance * 8G . Before this PR,executor will OOM and killed by yarn, after this PR, executor runs stably and no OOM, and improve total running time 6% (from 65min to 61 min)