iceberg
iceberg copied to clipboard
Spark: Close auto broadcast join in delete orphan action
When there are enough data files, using broadcast join for association may cause oom.
Physical plan before modified:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#219] +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@45ec7633, obj#218: java.lang.String +- DeserializeToObject newInstance(class scala.Tuple2), obj#217: scala.Tuple2 +- BroadcastHashJoin [_1#211.path], [_2#212.path], LeftOuter, BuildRight, false :- Project [named_struct(authority, authority#62, path, path#63, scheme, scheme#64, uriAsString, uriAsString#65) AS _1#211] : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#62, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#63, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#64, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#65] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1711b63f, obj#61: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- Scan[obj#52] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, structauthority:string,path:string,scheme:string,uriAsString:string, false].path),false), [plan_id=232] +- Union :- Project [named_struct(authority, authority#123, path, path#124, scheme, scheme#125, uriAsString, uriAsString#126) AS _2#212] : +- Filter isnotnull(path#124) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#123, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#124, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#125, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#126] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1c1e78eb, obj#122: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2780/770030446@1eae036e, obj#118: org.apache.iceberg.spark.actions.FileInfo : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.ManifestFileBean), (setPath,path#67.toString), (setAddedSnapshotId,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, addedSnapshotId#227L, true, false, true)), (setContent,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, content#221, true, false, true)), (setPartitionSpecId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, partitionSpecId#225, true, false, true)), (setLength,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, length#223L, true, false, true))), obj#117: org.apache.iceberg.spark.actions.ManifestFileBean : +- Exchange RoundRobinPartitioning(200), REPARTITION_BY_NUM, [plan_id=200] : +- HashAggregate(keys=[path#67], functions=[first(content#66, false), first(length#68L, false), first(partitionSpecId#92, false), first(addedSnapshotId#93L, false)], output=[content#221, path#67, length#223L, partitionSpecId#225, addedSnapshotId#227L]) : +- HashAggregate(keys=[path#67], functions=[partial_first(content#66, false), partial_first(length#68L, false), partial_first(partitionSpecId#92, false), partial_first(addedSnapshotId#93L, false)], output=[path#67, first#245, valueSet#246, first#247L, valueSet#248, first#249, valueSet#250, first#251L, valueSet#252]) : +- Project [content#66, path#67, length#68L, partition_spec_id#69 AS partitionSpecId#92, added_snapshot_id#70L AS addedSnapshotId#93L] : +- BatchScan[content#66, path#67, length#68L, partition_spec_id#69, added_snapshot_id#70L] file:/tmp/junit7658548847041001892/junit5488406415725968442/#all_manifests (branch=null) [filters=, groupedBy=] RuntimeFilters: [] :- Project [named_struct(authority, authority#166, path, path#167, scheme, scheme#168, uriAsString, uriAsString#169) AS _2#228] : +- Filter isnotnull(path#167) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#169] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@29173f02, obj#165: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#128.toString), (setType,type#153.toString)), obj#164: org.apache.iceberg.spark.actions.FileInfo : +- Project [path#128, Manifest AS type#153] : +- BatchScan[path#128] file:/tmp/junit7658548847041001892/junit5488406415725968442/#all_manifests (branch=null) [filters=, groupedBy=] RuntimeFilters: [] :- Project [named_struct(authority, authority#177, path, path#178, scheme, scheme#179, uriAsString, uriAsString#180) AS _2#229] : +- Filter isnotnull(path#178) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#177, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#178, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#179, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#180] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1b4d4949, obj#176: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#170.toString), (setType,type#171.toString)), obj#175: org.apache.iceberg.spark.actions.FileInfo : +- LocalTableScan [path#170, type#171] +- Project [named_struct(authority, authority#188, path, path#189, scheme, scheme#190, uriAsString, uriAsString#191) AS _2#230] +- Filter isnotnull(path#189) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#188, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#189, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#190, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#191] +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2729/204334828@1946579d, obj#187: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#181.toString), (setType,type#182.toString)), obj#186: org.apache.iceberg.spark.actions.FileInfo +- LocalTableScan [path#181, type#182]
Physical plan after modified:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#219] +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@7d1a1ee9, obj#218: java.lang.String +- DeserializeToObject newInstance(class scala.Tuple2), obj#217: scala.Tuple2 +- SortMergeJoin [_1#211.path], [_2#212.path], LeftOuter :- Sort [_1#211.path ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#211.path, 200), ENSURE_REQUIREMENTS, [plan_id=233] : +- Project [named_struct(authority, authority#62, path, path#63, scheme, scheme#64, uriAsString, uriAsString#65) AS _1#211] : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#62, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#63, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#64, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#65] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@122ff251, obj#61: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- Scan[obj#52] +- Sort [_2#212.path ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_2#212.path, 200), ENSURE_REQUIREMENTS, [plan_id=234] +- Union :- Project [named_struct(authority, authority#123, path, path#124, scheme, scheme#125, uriAsString, uriAsString#126) AS _2#212] : +- Filter isnotnull(path#124) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#123, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#124, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#125, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#126] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@438fc55e, obj#122: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2779/1142936077@56567e9b, obj#118: org.apache.iceberg.spark.actions.FileInfo : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.ManifestFileBean), (setPath,path#67.toString), (setAddedSnapshotId,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, addedSnapshotId#227L, true, false, true)), (setContent,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, content#221, true, false, true)), (setPartitionSpecId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, partitionSpecId#225, true, false, true)), (setLength,staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, length#223L, true, false, true))), obj#117: org.apache.iceberg.spark.actions.ManifestFileBean : +- Exchange RoundRobinPartitioning(200), REPARTITION_BY_NUM, [plan_id=200] : +- HashAggregate(keys=[path#67], functions=[first(content#66, false), first(length#68L, false), first(partitionSpecId#92, false), first(addedSnapshotId#93L, false)], output=[content#221, path#67, length#223L, partitionSpecId#225, addedSnapshotId#227L]) : +- HashAggregate(keys=[path#67], functions=[partial_first(content#66, false), partial_first(length#68L, false), partial_first(partitionSpecId#92, false), partial_first(addedSnapshotId#93L, false)], output=[path#67, first#245, valueSet#246, first#247L, valueSet#248, first#249, valueSet#250, first#251L, valueSet#252]) : +- Project [content#66, path#67, length#68L, partition_spec_id#69 AS partitionSpecId#92, added_snapshot_id#70L AS addedSnapshotId#93L] : +- BatchScan[content#66, path#67, length#68L, partition_spec_id#69, added_snapshot_id#70L] file:/tmp/junit8641658744871490880/junit9101792858122759973/#all_manifests (branch=null) [filters=, groupedBy=] RuntimeFilters: [] :- Project [named_struct(authority, authority#166, path, path#167, scheme, scheme#168, uriAsString, uriAsString#169) AS _2#228] : +- Filter isnotnull(path#167) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#169] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@4d121ab6, obj#165: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#128.toString), (setType,type#153.toString)), obj#164: org.apache.iceberg.spark.actions.FileInfo : +- Project [path#128, Manifest AS type#153] : +- BatchScan[path#128] file:/tmp/junit8641658744871490880/junit9101792858122759973/#all_manifests (branch=null) [filters=, groupedBy=] RuntimeFilters: [] :- Project [named_struct(authority, authority#177, path, path#178, scheme, scheme#179, uriAsString, uriAsString#180) AS _2#229] : +- Filter isnotnull(path#178) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#177, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#178, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#179, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#180] : +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@290def7d, obj#176: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI : +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#170.toString), (setType,type#171.toString)), obj#175: org.apache.iceberg.spark.actions.FileInfo : +- LocalTableScan [path#170, type#171] +- Project [named_struct(authority, authority#188, path, path#189, scheme, scheme#190, uriAsString, uriAsString#191) AS _2#230] +- Filter isnotnull(path#189) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getAuthority, true, false, true) AS authority#188, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getPath, true, false, true) AS path#189, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getScheme, true, false, true) AS scheme#190, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI, true]).getUriAsString, true, false, true) AS uriAsString#191] +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$2728/869880469@10ea1754, obj#187: org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction$FileURI +- DeserializeToObject initializejavabean(newInstance(class org.apache.iceberg.spark.actions.FileInfo), (setPath,path#181.toString), (setType,type#182.toString)), obj#186: org.apache.iceberg.spark.actions.FileInfo +- LocalTableScan [path#181, type#182]
Question, Is it something you can set on spark, before using SparkActions.get(spark).deleteOrphanFiles
Question, Is it something you can set on spark, before using SparkActions.get(spark).deleteOrphanFiles
@szehon-ho Yes, if we run the delete orphan action in code separately, it can directly set this config on spark. But if we need to run multiple SQLs or multiple ETL processes in one Spark Session, this may affect other SQLs or ETL processes. Our scenario is to share a service similar to Spark Thrift Server, and then submit CALL SQL to it.
Maybe you can call spark.cloneSession() before passing it in via SparkActions.get(spark)?
Maybe you can call spark.cloneSession() before passing it in via SparkActions.get(spark)?
@szehon-ho I saw that RemoveOrphanFilesProcedure will get the Spark instance from org.apache.iceberg.spark.procedures.BaseProcedure. If I set the config here, I have two concerns:
- One is that it may affect other Procedures. If other Procedures can use Broadcast Join later, it will not take effect.
- The second is that if we call Delete Orphan through Action in code, there may still be OOM risks.
What do you think of it?Or do you mean just that all Procedures can use spark.cloneSession(), and only turn off Auto Broadcast Join in DeleteOrphanFilesSparkAction?
Yea for Action, my thought was on each call you can do:
val mySpark = spark.cloneSession()
mySpark.setConf(...) // turn off auto broadcast join
SparkActions.get(mySpark).deleteOrphanFiles()
I think each SparkAction.get() makes an independent instance, with its own sparkSession.
I'm not sure about Procedure though. Looks like it uses activeSession, which is thread local.
Yea for Action, my thought was on each call you can do:
val mySpark = spark.cloneSession() mySpark.setConf(...) // turn off auto broadcast join SparkActions.get(mySpark).deleteOrphanFiles()I think each SparkAction.get() makes an independent instance, with its own sparkSession.
I'm not sure about Procedure though. Looks like it uses activeSession, which is thread local.
@szehon-ho I got your point. Actually such as Spark Thrift Server or Kyuubi which is long running service, they only expose the SQL APIs for users, and they share one application for users' SQLs, so you can't set conf like which in code, and they can't identify what type of SQL it is. This change mainly covers this scenario.
@szehon-ho I got your point. Actually such as Spark Thrift Server or Kyuubi which is long running service, they only expose the SQL APIs for users, and they share one application for users' SQLs, so you can't set conf like which in code, and they can't identify what type of SQL it is. This change mainly covers this scenario.
In that case couldn't you set something like sql("SET spark.sql.autoBroadcastJoinThreshold=-1") before the DeleteOrphanFilesSparkAction and change it back to default once it finishes.
@szehon-ho I got your point. Actually such as Spark Thrift Server or Kyuubi which is long running service, they only expose the SQL APIs for users, and they share one application for users' SQLs, so you can't set conf like which in code, and they can't identify what type of SQL it is. This change mainly covers this scenario.
In that case couldn't you set something like
sql("SET spark.sql.autoBroadcastJoinThreshold=-1")before the DeleteOrphanFilesSparkAction and change it back to default once it finishes.
If I knew this, I would definitely set it up like this. But in fact not all users know how to set it up, unless after OOM occurs, I think the cost may be even greater.
In that case couldn't you set something like
sql("SET spark.sql.autoBroadcastJoinThreshold=-1")before the DeleteOrphanFilesSparkAction and change it back to default once it finishes.If I knew this, I would definitely set it up like this. But in fact not all users know how to set it up, unless after OOM occurs, I think the cost may be even greater.
I think this is always a problem with any spark job that involves join. If the join estimations are wrong you get OOM. That doesn't mean that we completely disable broadcast join for everybody. if your users are unable to disable broadcast join via config, then maybe you can disable it for them on your own fork so that it doesn't affect everybody else who is using Iceberg.
In that case couldn't you set something like
sql("SET spark.sql.autoBroadcastJoinThreshold=-1")before the DeleteOrphanFilesSparkAction and change it back to default once it finishes.If I knew this, I would definitely set it up like this. But in fact not all users know how to set it up, unless after OOM occurs, I think the cost may be even greater.
I think this is always a problem with any spark job that involves join. If the join estimations are wrong you get OOM. That doesn't mean that we completely disable broadcast join for everybody. if your users are unable to disable broadcast join via config, then maybe you can disable it for them on your own fork so that it doesn't affect everybody else who is using Iceberg.
Yes, we have disabled it internally at the moment, but I personally think it's still not the best way. This is why I open this patch to the community, to see if anyone can optimize the action and fundamentally solve the problem of inaccurate estimates.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.