iceberg
iceberg copied to clipboard
Spark: Fix a separate table cache being created for each rewriteFiles
Currently, during Spark's rewrite data files procedure with bin pack strategy, SparkSession
is cloned to disable AQE in each rewriteFiles
. Since a cloned SparkSession
has its own state, V2SessionCatalog is reloaded every time and a separate table cache is created. That means each file group has its own table cache and effectively disables the table cache.
This PR fixes it by cloning SparkSession
when creating SparkBinPackStrategy
.
@rdblue @aokolnychyi @kbendick Please help review and suggest where to add a UT. I haven't found a proper place.
I'm interested to hear what @szehon-ho and @RussellSpitzer think about this.
My initial reaction is that this is not something that we should change. We don't want to disable AQE for other Spark work, which is a side-effect of this change. I also don't like that we need to create a new Spark session for each rewrite, but I don't think there is much we can do to avoid it if we want to disable AQE. We could also fail if AQE is on or just accept the AQE results.
Also, is a separate table cache a bug? Since it is only used once, what is the problem with doing it this way? Sure, this won't cache the rewritten table, but is there a behavior problem or are loads just slightly slower?
@rdblue I've moved cloning session and disabling AQE into RewriteDataFilesSparkAction
.
When multiple rewrite actions are submitted concurrently as follows, they will block each other when loading table due to locks in shared HiveExternalCatalog
. That hurts the overall performance of rewrite actions.
CALL spark_catalog.system.rewrite_data_files(table => 'default.table', options => map('max-concurrent-file-group-rewrites', '200'));
There is another lock in SessionCatalog#tableExists
for Spark versions before https://github.com/apache/spark/pull/31891 since sessionCatalog is also shared.
Ignore my previous comments I had the caches mistaken.
I will say that I don't love the idea of asking users to disable AQE (e.g. not cloning the session and ensuring AQE is disabled), as much as cloning the session is somewhat of a pain.
People use these statements at the end of queries and disabling AQE would be a bummer.
@manuzhang, it seems reasonable to create a session for the entire rewrite, not just each Spark submission. Is that what was happening before?
Is that what was happening before?
Do you mean session was created for the entire rewrite before?
Do you mean session was created for the entire rewrite before?
I'm asking you what the behavior was before this change that you want to fix.
Not sure. The furthest I can track is https://github.com/apache/iceberg/pull/2591 and the behavior is same as now.
@rdblue Any more concerns or suggestions for this PR?
@rdblue and @kbendick any more comments?
Gentle ping @rdblue @aokolnychyi @kbendick for another review
@ajantha-bhat I forgot to remove cloneSession in SparkSortStrategy
and SparkZOrderStrategy
. Please review again.
@ajantha-bhat I forgot to remove cloneSession in SparkSortStrategy and SparkZOrderStrategy. Please review again.
Yeah, last time I gave a comment that these other places also have issues and I forgot to recheck😔 whether it was addressed or not. I think now we cover all the places. Thanks.
@ajantha-bhat @rdblue any more comments?
@manuzhang, it seems reasonable to create a session for the entire rewrite, not just each Spark submission. Is that what was happening before?
Yes basically the old behavior would be to clone the Spark Session for each file group rewrite, rather than once for the entire action.
@RussellSpitzer please check again whether it's fixed now.
@RussellSpitzer @rdblue @ajantha-bhat please take another look. Thanks.
I think that all of the session references are to the cloned Spark session now. +1.
Thanks, @manuzhang!
Thanks @rdblue @ajantha-bhat @RussellSpitzer @kbendick @hililiwei for your review. I opened #6284 and #6285 to back-port the PR to Spark 3.2 and 3.1 respectively. Please help review those PRs as well.