spark
spark copied to clipboard
[SPARK-39983][CORE][SQL] Do not cache unserialized broadcast relations on the driver
What changes were proposed in this pull request?
This PR addresses the issue raised in https://issues.apache.org/jira/browse/SPARK-39983 - broadcast relations should not be cached on the driver as they are not needed and can cause significant memory pressure (in one case the relation was 60MB )
The PR adds a new SparkContext.broadcastInternal method with parameter serializedOnly allowing the caller to specify that the broadcasted object should be stored only in serialized form. The current behavior is to also cache an unserialized form of the object.
The PR changes the broadcast implementation in TorrentBroadcast to honor the serializedOnly flag and not store the unserialized value, unless the execution is in a local mode (single process). In that case the broadcast cache is effectively shared between driver and executors and thus the unserialized value needs to be cached to satisfy the executor-side of the functionality.
Why are the changes needed?
The broadcast relations can be fairly large (observed 60MB one) and are not needed in unserialized form on the driver.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a new unit test to BroadcastSuite verifying the low-level broadcast functionality in respect to the serializedOnly flag. Added a new unit test to BroadcastExchangeSuite verifying that broadcasted relations are not cached on the driver.
Doesn't this not break in local mode ?
@mridulm, it doesn't break local mode because there's a carve-out to preserve the existing behavior in that case: in both places where the if(serializedOnly check changes behavior, there's a check for isLocalMaster to avoid behavior changes:
We'll still store the original object in the driver block manager at write time in local mode:
https://github.com/apache/spark/blob/75ab18ee0e382b8117bf65fc9ef05190d4fdf01a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L133-L136
There's a similar carve-out in readBroadcastBlock (although I don't think we'd ever actually hit that branch in local mode given that we would have already stored the re-assembled broadcast block in writeBlocks):
https://github.com/apache/spark/blob/75ab18ee0e382b8117bf65fc9ef05190d4fdf01a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L277-L284
@JoshRosen Ah yes, missed out on that - should have taken a more detailed look.
Can one of the admins verify this patch?
QQ @JoshRosen, @alex-balikov - if the expectation is that the variable can be recreated (if missing) at driver - with that being a remote possibility, do we want to make it a WeakReference instead of a SoftReference ?
Softref's are kept around as much as possible by the jvm (usually until there are possibilities of OOM), unlike Weakref's
QQ @JoshRosen, @alex-balikov - if the expectation is that the variable can be recreated (if missing) at driver - with that being a remote possibility, do we want to make it a
WeakReferenceinstead of aSoftReference? Softref's are kept around as much as possible by the jvm (usually until there are possibilities of OOM), unlike Weakref's
Good question. It looks like PR https://github.com/apache/spark/pull/22995 originally proposed to make this into a WeakReference but ended up using a SoftReference in an attempt to reduce the likelihood that a broadcast variable would be cleared on GC. I'm slightly hesitant to want to change behaviors that could impact user created broadcast variables (although maybe I'm being overly-conservative).
I don't think that changing SoftReference to WeakReference would negatively impact broadcast hash joins or other SQL operators that use internal broadcast variables: each of those operators retrieves the broadcast value and stores a strong reference to it for the duration of the task, so as long as one task is running the WeakReference here and in the cache added in https://github.com/apache/spark/pull/20183 should still be alive, so the cache will be effective in keeping the broadcast variable value from being duplicated.
I agree that the WeakReference could make sense for these internal broadcast variables on the driver, though. It looks like SoftReference and WeakReference share a common superclass, so we can flag the reference type based on whether this is an internal or external broadcast variable. Here's a patch implementing that idea : https://github.com/JoshRosen/spark/commit/01fd8be179274d3fe298f691264e2e1bd37bca2e
@alex-balikov, @mridulm, WDYT? If that WeakRef change makes sense to you, let's cherry-pick it into this PR?
The proposed change looks good to me @JoshRosen
I applied @JoshRosen 's change
Hi everyone, I am Radek from HuuugeGames, we use Databricks in a version of runtime 10.4 LTS and I wanted to just let you know that after including your changes to the runtime (Databricks did that at 26.08 during their maintenance) we found our job started to behave inconsistently as from time to time we are pruning all of the source files during the scanning from s3 with using dynamic file pruning. I attached the screenshot that shows lost of broadcasted data during the DFP (1 records read from Reuse Exchange, where normally there should be 97) which results no records read from the S3. We are making join between small dim and events table and definitely something is happening here. After disable of DFP the plan has changed and the process looks stable. We also got back to the previous version of the Databricks runtime image without this changes and also process looks good even when DFP is enabled.
Hi @sos3k,
We investigated your bug report and determined that the root-cause was a latent bug in UnsafeHashedRelation that was triggered more frequently following the backport of this PR.
PR https://github.com/apache/spark/pull/35836 fixed a bug related to driver-side deserialization of UnsafeHashedRelation, but that fix was missing from DBR 9.1 and 10.4 (but is present in all newer versions). We have deployed a hotfix to backport https://github.com/apache/spark/pull/35836 into DBR 9.1 and 10.4.